1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252425342544255425642574258425942604261426242634264426542664267426842694270427142724273427442754276427742784279428042814282428342844285428642874288428942904291429242934294429542964297429842994300430143024303430443054306430743084309431043114312431343144315431643174318431943204321432243234324432543264327432843294330433143324333433443354336433743384339434043414342434343444345434643474348434943504351435243534354435543564357435843594360436143624363436443654366436743684369437043714372437343744375437643774378437943804381438243834384438543864387438843894390439143924393439443954396439743984399440044014402440344044405440644074408440944104411441244134414441544164417441844194420442144224423442444254426442744284429443044314432443344344435443644374438443944404441444244434444444544464447444844494450445144524453445444554456445744584459446044614462446344644465446644674468446944704471447244734474447544764477447844794480448144824483448444854486448744884489449044914492449344944495449644974498449945004501450245034504450545064507450845094510451145124513451445154516451745184519452045214522452345244525452645274528452945304531453245334534453545364537453845394540454145424543454445454546454745484549455045514552455345544555455645574558455945604561456245634564456545664567456845694570457145724573457445754576457745784579458045814582458345844585458645874588458945904591459245934594459545964597459845994600460146024603460446054606460746084609461046114612461346144615461646174618461946204621462246234624462546264627462846294630463146324633463446354636463746384639464046414642464346444645464646474648464946504651465246534654465546564657465846594660466146624663466446654666466746684669467046714672467346744675467646774678467946804681468246834684468546864687468846894690469146924693469446954696469746984699470047014702470347044705470647074708470947104711471247134714471547164717471847194720472147224723472447254726472747284729473047314732473347344735473647374738473947404741474247434744474547464747474847494750 |
- # -*- coding: utf-8 -*-
- '''
- Tests for the file state
- '''
- # Import Python libs
- from __future__ import absolute_import, print_function, unicode_literals
- import errno
- import logging
- import os
- import re
- import sys
- import shutil
- import stat
- import tempfile
- import textwrap
- import filecmp
- log = logging.getLogger(__name__)
- # Import Salt Testing libs
- from tests.support.case import ModuleCase
- from tests.support.unit import skipIf
- from tests.support.paths import BASE_FILES, FILES, TMP, TMP_STATE_TREE
- from tests.support.helpers import (
- destructiveTest,
- skip_if_not_root,
- with_system_user_and_group,
- with_tempdir,
- with_tempfile,
- Webserver,
- destructiveTest,
- dedent,
- )
- from tests.support.mixins import SaltReturnAssertsMixin
- # Import Salt libs
- import salt.utils.data
- import salt.utils.files
- import salt.utils.json
- import salt.utils.path
- import salt.utils.platform
- import salt.utils.stringutils
- import salt.serializers.configparser
- from salt.utils.versions import LooseVersion as _LooseVersion
- HAS_PWD = True
- try:
- import pwd
- except ImportError:
- HAS_PWD = False
- HAS_GRP = True
- try:
- import grp
- except ImportError:
- HAS_GRP = False
- # Import 3rd-party libs
- from salt.ext import six
- from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
- IS_WINDOWS = salt.utils.platform.is_windows()
- BINARY_FILE = b'GIF89a\x01\x00\x01\x00\x80\x00\x00\x05\x04\x04\x00\x00\x00,\x00\x00\x00\x00\x01\x00\x01\x00\x00\x02\x02D\x01\x00;'
- if IS_WINDOWS:
- FILEPILLAR = 'C:\\Windows\\Temp\\filepillar-python'
- FILEPILLARDEF = 'C:\\Windows\\Temp\\filepillar-defaultvalue'
- FILEPILLARGIT = 'C:\\Windows\\Temp\\filepillar-bar'
- else:
- FILEPILLAR = '/tmp/filepillar-python'
- FILEPILLARDEF = '/tmp/filepillar-defaultvalue'
- FILEPILLARGIT = '/tmp/filepillar-bar'
- def _test_managed_file_mode_keep_helper(testcase, local=False):
- '''
- DRY helper function to run the same test with a local or remote path
- '''
- name = os.path.join(TMP, 'scene33')
- grail_fs_path = os.path.join(BASE_FILES, 'grail', 'scene33')
- grail = 'salt://grail/scene33' if not local else grail_fs_path
- # Get the current mode so that we can put the file back the way we
- # found it when we're done.
- grail_fs_mode = int(testcase.run_function('file.get_mode', [grail_fs_path]), 8)
- initial_mode = 0o770
- new_mode_1 = 0o600
- new_mode_2 = 0o644
- # Set the initial mode, so we can be assured that when we set the mode
- # to "keep", we're actually changing the permissions of the file to the
- # new mode.
- ret = testcase.run_state(
- 'file.managed',
- name=name,
- mode=oct(initial_mode),
- source=grail,
- )
- if IS_WINDOWS:
- testcase.assertSaltFalseReturn(ret)
- return
- testcase.assertSaltTrueReturn(ret)
- try:
- # Update the mode on the fileserver (pass 1)
- os.chmod(grail_fs_path, new_mode_1)
- ret = testcase.run_state(
- 'file.managed',
- name=name,
- mode='keep',
- source=grail,
- )
- testcase.assertSaltTrueReturn(ret)
- managed_mode = stat.S_IMODE(os.stat(name).st_mode)
- testcase.assertEqual(oct(managed_mode), oct(new_mode_1))
- # Update the mode on the fileserver (pass 2)
- # This assures us that if the file in file_roots was originally set
- # to the same mode as new_mode_1, we definitely get an updated mode
- # this time.
- os.chmod(grail_fs_path, new_mode_2)
- ret = testcase.run_state(
- 'file.managed',
- name=name,
- mode='keep',
- source=grail,
- )
- testcase.assertSaltTrueReturn(ret)
- managed_mode = stat.S_IMODE(os.stat(name).st_mode)
- testcase.assertEqual(oct(managed_mode), oct(new_mode_2))
- except Exception:
- raise
- finally:
- # Set the mode of the file in the file_roots back to what it
- # originally was.
- os.chmod(grail_fs_path, grail_fs_mode)
- class FileTest(ModuleCase, SaltReturnAssertsMixin):
- '''
- Validate the file state
- '''
- def tearDown(self):
- '''
- remove files created in previous tests
- '''
- user = 'salt'
- if user in str(self.run_function('user.list_users')):
- self.run_function('user.delete', [user])
- for path in (FILEPILLAR, FILEPILLARDEF, FILEPILLARGIT):
- try:
- os.remove(path)
- except OSError as exc:
- if exc.errno != errno.ENOENT:
- log.error('Failed to remove %s: %s', path, exc)
- def test_symlink(self):
- '''
- file.symlink
- '''
- name = os.path.join(TMP, 'symlink')
- tgt = os.path.join(TMP, 'target')
- # Windows must have a source directory to link to
- if IS_WINDOWS and not os.path.isdir(tgt):
- os.mkdir(tgt)
- # Windows cannot create a symlink if it already exists
- if IS_WINDOWS and self.run_function('file.is_link', [name]):
- self.run_function('file.remove', [name])
- ret = self.run_state('file.symlink', name=name, target=tgt)
- self.assertSaltTrueReturn(ret)
- def test_test_symlink(self):
- '''
- file.symlink test interface
- '''
- name = os.path.join(TMP, 'symlink2')
- tgt = os.path.join(TMP, 'target')
- ret = self.run_state('file.symlink', test=True, name=name, target=tgt)
- self.assertSaltNoneReturn(ret)
- def test_absent_file(self):
- '''
- file.absent
- '''
- name = os.path.join(TMP, 'file_to_kill')
- with salt.utils.files.fopen(name, 'w+') as fp_:
- fp_.write('killme')
- ret = self.run_state('file.absent', name=name)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(os.path.isfile(name))
- def test_absent_dir(self):
- '''
- file.absent
- '''
- name = os.path.join(TMP, 'dir_to_kill')
- if not os.path.isdir(name):
- # left behind... Don't fail because of this!
- os.makedirs(name)
- ret = self.run_state('file.absent', name=name)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(os.path.isdir(name))
- def test_absent_link(self):
- '''
- file.absent
- '''
- name = os.path.join(TMP, 'link_to_kill')
- tgt = '{0}.tgt'.format(name)
- # Windows must have a source directory to link to
- if IS_WINDOWS and not os.path.isdir(tgt):
- os.mkdir(tgt)
- if not self.run_function('file.is_link', [name]):
- self.run_function('file.symlink', [tgt, name])
- ret = self.run_state('file.absent', name=name)
- try:
- self.assertSaltTrueReturn(ret)
- self.assertFalse(self.run_function('file.is_link', [name]))
- finally:
- if self.run_function('file.is_link', [name]):
- self.run_function('file.remove', [name])
- @with_tempfile()
- def test_test_absent(self, name):
- '''
- file.absent test interface
- '''
- with salt.utils.files.fopen(name, 'w+') as fp_:
- fp_.write('killme')
- ret = self.run_state('file.absent', test=True, name=name)
- self.assertSaltNoneReturn(ret)
- self.assertTrue(os.path.isfile(name))
- def test_managed(self):
- '''
- file.managed
- '''
- name = os.path.join(TMP, 'grail_scene33')
- ret = self.run_state(
- 'file.managed', name=name, source='salt://grail/scene33'
- )
- src = os.path.join(BASE_FILES, 'grail', 'scene33')
- with salt.utils.files.fopen(src, 'r') as fp_:
- master_data = fp_.read()
- with salt.utils.files.fopen(name, 'r') as fp_:
- minion_data = fp_.read()
- self.assertEqual(master_data, minion_data)
- self.assertSaltTrueReturn(ret)
- def test_managed_file_mode(self):
- '''
- file.managed, correct file permissions
- '''
- desired_mode = 504 # 0770 octal
- name = os.path.join(TMP, 'grail_scene33')
- ret = self.run_state(
- 'file.managed', name=name, mode='0770', source='salt://grail/scene33'
- )
- if IS_WINDOWS:
- expected = 'The \'mode\' option is not supported on Windows'
- self.assertEqual(ret[list(ret)[0]]['comment'], expected)
- self.assertSaltFalseReturn(ret)
- return
- resulting_mode = stat.S_IMODE(
- os.stat(name).st_mode
- )
- self.assertEqual(oct(desired_mode), oct(resulting_mode))
- self.assertSaltTrueReturn(ret)
- def test_managed_file_mode_keep(self):
- '''
- Test using "mode: keep" in a file.managed state
- '''
- _test_managed_file_mode_keep_helper(self, local=False)
- def test_managed_file_mode_keep_local_source(self):
- '''
- Test using "mode: keep" in a file.managed state, with a local file path
- as the source.
- '''
- _test_managed_file_mode_keep_helper(self, local=True)
- def test_managed_file_mode_file_exists_replace(self):
- '''
- file.managed, existing file with replace=True, change permissions
- '''
- initial_mode = 504 # 0770 octal
- desired_mode = 384 # 0600 octal
- name = os.path.join(TMP, 'grail_scene33')
- ret = self.run_state(
- 'file.managed', name=name, mode=oct(initial_mode), source='salt://grail/scene33'
- )
- if IS_WINDOWS:
- expected = 'The \'mode\' option is not supported on Windows'
- self.assertEqual(ret[list(ret)[0]]['comment'], expected)
- self.assertSaltFalseReturn(ret)
- return
- resulting_mode = stat.S_IMODE(
- os.stat(name).st_mode
- )
- self.assertEqual(oct(initial_mode), oct(resulting_mode))
- name = os.path.join(TMP, 'grail_scene33')
- ret = self.run_state(
- 'file.managed', name=name, replace=True, mode=oct(desired_mode), source='salt://grail/scene33'
- )
- resulting_mode = stat.S_IMODE(
- os.stat(name).st_mode
- )
- self.assertEqual(oct(desired_mode), oct(resulting_mode))
- self.assertSaltTrueReturn(ret)
- def test_managed_file_mode_file_exists_noreplace(self):
- '''
- file.managed, existing file with replace=False, change permissions
- '''
- initial_mode = 504 # 0770 octal
- desired_mode = 384 # 0600 octal
- name = os.path.join(TMP, 'grail_scene33')
- ret = self.run_state(
- 'file.managed', name=name, replace=True, mode=oct(initial_mode), source='salt://grail/scene33'
- )
- if IS_WINDOWS:
- expected = 'The \'mode\' option is not supported on Windows'
- self.assertEqual(ret[list(ret)[0]]['comment'], expected)
- self.assertSaltFalseReturn(ret)
- return
- ret = self.run_state(
- 'file.managed', name=name, replace=False, mode=oct(desired_mode), source='salt://grail/scene33'
- )
- resulting_mode = stat.S_IMODE(
- os.stat(name).st_mode
- )
- self.assertEqual(oct(desired_mode), oct(resulting_mode))
- self.assertSaltTrueReturn(ret)
- def test_managed_file_with_grains_data(self):
- '''
- Test to ensure we can render grains data into a managed
- file.
- '''
- grain_path = os.path.join(TMP, 'file-grain-test')
- state_file = 'file-grainget'
- self.run_function('state.sls', [state_file], pillar={'grain_path': grain_path})
- self.assertTrue(os.path.exists(grain_path))
- with salt.utils.files.fopen(grain_path, 'r') as fp_:
- file_contents = fp_.readlines()
- if IS_WINDOWS:
- match = '^minion\r\n'
- else:
- match = '^minion\n'
- self.assertTrue(re.match(match, file_contents[0]))
- def test_managed_file_with_pillar_sls(self):
- '''
- Test to ensure pillar data in sls file
- is rendered properly and file is created.
- '''
- state_name = 'file-pillarget'
- ret = self.run_function('state.sls', [state_name])
- self.assertSaltTrueReturn(ret)
- # Check to make sure the file was created
- check_file = self.run_function('file.file_exists', [FILEPILLAR])
- self.assertTrue(check_file)
- def test_managed_file_with_pillardefault_sls(self):
- '''
- Test to ensure when pillar data is not available
- in sls file with pillar.get it uses the default
- value.
- '''
- state_name = 'file-pillardefaultget'
- ret = self.run_function('state.sls', [state_name])
- self.assertSaltTrueReturn(ret)
- # Check to make sure the file was created
- check_file = self.run_function('file.file_exists', [FILEPILLARDEF])
- self.assertTrue(check_file)
- @skip_if_not_root
- def test_managed_dir_mode(self):
- '''
- Tests to ensure that file.managed creates directories with the
- permissions requested with the dir_mode argument
- '''
- desired_mode = 511 # 0777 in octal
- name = os.path.join(TMP, 'a', 'managed_dir_mode_test_file')
- desired_owner = 'nobody'
- ret = self.run_state(
- 'file.managed',
- name=name,
- source='salt://grail/scene33',
- mode=600,
- makedirs=True,
- user=desired_owner,
- dir_mode=oct(desired_mode) # 0777
- )
- if IS_WINDOWS:
- expected = 'The \'mode\' option is not supported on Windows'
- self.assertEqual(ret[list(ret)[0]]['comment'], expected)
- self.assertSaltFalseReturn(ret)
- return
- resulting_mode = stat.S_IMODE(
- os.stat(os.path.join(TMP, 'a')).st_mode
- )
- resulting_owner = pwd.getpwuid(os.stat(os.path.join(TMP, 'a')).st_uid).pw_name
- self.assertEqual(oct(desired_mode), oct(resulting_mode))
- self.assertSaltTrueReturn(ret)
- self.assertEqual(desired_owner, resulting_owner)
- def test_test_managed(self):
- '''
- file.managed test interface
- '''
- name = os.path.join(TMP, 'grail_not_not_scene33')
- ret = self.run_state(
- 'file.managed', test=True, name=name, source='salt://grail/scene33'
- )
- self.assertSaltNoneReturn(ret)
- self.assertFalse(os.path.isfile(name))
- def test_managed_show_changes_false(self):
- '''
- file.managed test interface
- '''
- name = os.path.join(TMP, 'grail_not_scene33')
- with salt.utils.files.fopen(name, 'wb') as fp_:
- fp_.write(b'test_managed_show_changes_false\n')
- ret = self.run_state(
- 'file.managed', name=name, source='salt://grail/scene33',
- show_changes=False
- )
- changes = next(six.itervalues(ret))['changes']
- self.assertEqual('<show_changes=False>', changes['diff'])
- def test_managed_show_changes_true(self):
- '''
- file.managed test interface
- '''
- name = os.path.join(TMP, 'grail_not_scene33')
- with salt.utils.files.fopen(name, 'wb') as fp_:
- fp_.write(b'test_managed_show_changes_false\n')
- ret = self.run_state(
- 'file.managed', name=name, source='salt://grail/scene33',
- )
- changes = next(six.itervalues(ret))['changes']
- self.assertIn('diff', changes)
- @skipIf(IS_WINDOWS, 'Don\'t know how to fix for Windows')
- def test_managed_escaped_file_path(self):
- '''
- file.managed test that 'salt://|' protects unusual characters in file path
- '''
- funny_file = salt.utils.files.mkstemp(prefix='?f!le? n@=3&', suffix='.file type')
- funny_file_name = os.path.split(funny_file)[1]
- funny_url = 'salt://|' + funny_file_name
- funny_url_path = os.path.join(BASE_FILES, funny_file_name)
- state_name = 'funny_file'
- state_file_name = state_name + '.sls'
- state_file = os.path.join(BASE_FILES, state_file_name)
- state_key = 'file_|-{0}_|-{0}_|-managed'.format(funny_file)
- self.addCleanup(os.remove, state_file)
- self.addCleanup(os.remove, funny_file)
- self.addCleanup(os.remove, funny_url_path)
- with salt.utils.files.fopen(funny_url_path, 'w'):
- pass
- with salt.utils.files.fopen(state_file, 'w') as fp_:
- fp_.write(textwrap.dedent('''\
- {0}:
- file.managed:
- - source: {1}
- - makedirs: True
- '''.format(funny_file, funny_url)))
- ret = self.run_function('state.sls', [state_name])
- self.assertTrue(ret[state_key]['result'])
- def test_managed_contents(self):
- '''
- test file.managed with contents that is a boolean, string, integer,
- float, list, and dictionary
- '''
- state_name = 'file-FileTest-test_managed_contents'
- state_filename = state_name + '.sls'
- state_file = os.path.join(BASE_FILES, state_filename)
- managed_files = {}
- state_keys = {}
- for typ in ('bool', 'str', 'int', 'float', 'list', 'dict'):
- managed_files[typ] = salt.utils.files.mkstemp()
- state_keys[typ] = 'file_|-{0} file_|-{1}_|-managed'.format(typ, managed_files[typ])
- try:
- with salt.utils.files.fopen(state_file, 'w') as fd_:
- fd_.write(textwrap.dedent('''\
- bool file:
- file.managed:
- - name: {bool}
- - contents: True
- str file:
- file.managed:
- - name: {str}
- - contents: Salt was here.
- int file:
- file.managed:
- - name: {int}
- - contents: 340282366920938463463374607431768211456
- float file:
- file.managed:
- - name: {float}
- - contents: 1.7518e-45 # gravitational coupling constant
- list file:
- file.managed:
- - name: {list}
- - contents: [1, 1, 2, 3, 5, 8, 13]
- dict file:
- file.managed:
- - name: {dict}
- - contents:
- C: charge
- P: parity
- T: time
- '''.format(**managed_files)))
- ret = self.run_function('state.sls', [state_name])
- for typ in state_keys:
- self.assertTrue(ret[state_keys[typ]]['result'])
- self.assertIn('diff', ret[state_keys[typ]]['changes'])
- finally:
- os.remove(state_file)
- for typ in managed_files:
- os.remove(managed_files[typ])
- @skip_if_not_root
- @skipIf(IS_WINDOWS, 'Windows does not support "mode" kwarg. Skipping.')
- @skipIf(not salt.utils.path.which('visudo'), 'sudo is missing')
- def test_managed_check_cmd(self):
- '''
- Test file.managed passing a basic check_cmd kwarg. See Issue #38111.
- '''
- r_group = 'root'
- if salt.utils.platform.is_darwin():
- r_group = 'wheel'
- try:
- ret = self.run_state(
- 'file.managed',
- name='/tmp/sudoers',
- user='root',
- group=r_group,
- mode=440,
- check_cmd='visudo -c -s -f'
- )
- self.assertSaltTrueReturn(ret)
- self.assertInSaltComment('Empty file', ret)
- self.assertEqual(ret['file_|-/tmp/sudoers_|-/tmp/sudoers_|-managed']['changes'],
- {'new': 'file /tmp/sudoers created', 'mode': '0440'})
- finally:
- # Clean Up File
- if os.path.exists('/tmp/sudoers'):
- os.remove('/tmp/sudoers')
- def test_managed_local_source_with_source_hash(self):
- '''
- Make sure that we enforce the source_hash even with local files
- '''
- name = os.path.join(TMP, 'local_source_with_source_hash')
- local_path = os.path.join(BASE_FILES, 'grail', 'scene33')
- actual_hash = '567fd840bf1548edc35c48eb66cdd78bfdfcccff'
- if IS_WINDOWS:
- # CRLF vs LF causes a different hash on windows
- actual_hash = 'f658a0ec121d9c17088795afcc6ff3c43cb9842a'
- # Reverse the actual hash
- bad_hash = actual_hash[::-1]
- def remove_file():
- try:
- os.remove(name)
- except OSError as exc:
- if exc.errno != errno.ENOENT:
- raise
- def do_test(clean=False):
- for proto in ('file://', ''):
- source = proto + local_path
- log.debug('Trying source %s', source)
- try:
- ret = self.run_state(
- 'file.managed',
- name=name,
- source=source,
- source_hash='sha1={0}'.format(bad_hash))
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- # Shouldn't be any changes
- self.assertFalse(ret['changes'])
- # Check that we identified a hash mismatch
- self.assertIn(
- 'does not match actual checksum', ret['comment'])
- ret = self.run_state(
- 'file.managed',
- name=name,
- source=source,
- source_hash='sha1={0}'.format(actual_hash))
- self.assertSaltTrueReturn(ret)
- finally:
- if clean:
- remove_file()
- remove_file()
- log.debug('Trying with nonexistant destination file')
- do_test()
- log.debug('Trying with destination file already present')
- with salt.utils.files.fopen(name, 'w'):
- pass
- try:
- do_test(clean=False)
- finally:
- remove_file()
- def test_managed_local_source_does_not_exist(self):
- '''
- Make sure that we exit gracefully when a local source doesn't exist
- '''
- name = os.path.join(TMP, 'local_source_does_not_exist')
- local_path = os.path.join(BASE_FILES, 'grail', 'scene99')
- for proto in ('file://', ''):
- source = proto + local_path
- log.debug('Trying source %s', source)
- ret = self.run_state(
- 'file.managed',
- name=name,
- source=source)
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- # Shouldn't be any changes
- self.assertFalse(ret['changes'])
- # Check that we identified a hash mismatch
- self.assertIn(
- 'does not exist', ret['comment'])
- def test_managed_unicode_jinja_with_tojson_filter(self):
- '''
- Using {{ varname }} with a list or dictionary which contains unicode
- types on Python 2 will result in Jinja rendering the "u" prefix on each
- string. This tests that using the "tojson" jinja filter will dump them
- to a format which can be successfully loaded by our YAML loader.
- The two lines that should end up being rendered are meant to test two
- issues that would trip up PyYAML if the "tojson" filter were not used:
- 1. A unicode string type would be loaded as a unicode literal with the
- leading "u" as well as the quotes, rather than simply being loaded
- as the proper unicode type which matches the content of the string
- literal. In other words, u'foo' would be loaded literally as
- u"u'foo'". This test includes actual non-ascii unicode in one of the
- strings to confirm that this also handles these international
- characters properly.
- 2. Any unicode string type (such as a URL) which contains a colon would
- cause a ScannerError in PyYAML, as it would be assumed to delimit a
- mapping node.
- Dumping the data structure to JSON using the "tojson" jinja filter
- should produce an inline data structure which is valid YAML and will be
- loaded properly by our YAML loader.
- '''
- test_file = os.path.join(TMP, 'test-tojson.txt')
- ret = self.run_function(
- 'state.apply',
- mods='tojson',
- pillar={'tojson-file': test_file})
- ret = ret[next(iter(ret))]
- assert ret['result'], ret
- with salt.utils.files.fopen(test_file, mode='rb') as fp_:
- managed = salt.utils.stringutils.to_unicode(fp_.read())
- expected = dedent('''\
- Die Webseite ist https://saltstack.com.
- Der Zucker ist süß.
- ''')
- assert managed == expected, '{0!r} != {1!r}'.format(managed, expected) # pylint: disable=repr-flag-used-in-string
- def test_managed_source_hash_indifferent_case(self):
- '''
- Test passing a source_hash as an uppercase hash.
- This is a regression test for Issue #38914 and Issue #48230 (test=true use).
- '''
- name = os.path.join(TMP, 'source_hash_indifferent_case')
- state_name = 'file_|-{0}_|' \
- '-{0}_|-managed'.format(name)
- local_path = os.path.join(BASE_FILES, 'hello_world.txt')
- actual_hash = 'c98c24b677eff44860afea6f493bbaec5bb1c4cbb209c6fc2bbb47f66ff2ad31'
- if IS_WINDOWS:
- # CRLF vs LF causes a differnt hash on windows
- actual_hash = '92b772380a3f8e27a93e57e6deeca6c01da07f5aadce78bb2fbb20de10a66925'
- uppercase_hash = actual_hash.upper()
- try:
- # Lay down tmp file to test against
- self.run_state(
- 'file.managed',
- name=name,
- source=local_path,
- source_hash=actual_hash
- )
- # Test uppercase source_hash: should return True with no changes
- ret = self.run_state(
- 'file.managed',
- name=name,
- source=local_path,
- source_hash=uppercase_hash
- )
- assert ret[state_name]['result'] is True
- assert ret[state_name]['changes'] == {}
- # Test uppercase source_hash using test=true
- # Should return True with no changes
- ret = self.run_state(
- 'file.managed',
- name=name,
- source=local_path,
- source_hash=uppercase_hash,
- test=True
- )
- assert ret[state_name]['result'] is True
- assert ret[state_name]['changes'] == {}
- finally:
- # Clean Up File
- if os.path.exists(name):
- os.remove(name)
- @with_tempfile(create=False)
- def test_managed_latin1_diff(self, name):
- '''
- Tests that latin-1 file contents are represented properly in the diff
- '''
- # Lay down the initial file
- ret = self.run_state(
- 'file.managed',
- name=name,
- source='salt://issue-48777/old.html')
- ret = ret[next(iter(ret))]
- assert ret['result'] is True, ret
- # Replace it with the new file and check the diff
- ret = self.run_state(
- 'file.managed',
- name=name,
- source='salt://issue-48777/new.html')
- ret = ret[next(iter(ret))]
- assert ret['result'] is True, ret
- diff_lines = ret['changes']['diff'].split(os.linesep)
- assert '+räksmörgås' in diff_lines, diff_lines
- @with_tempfile()
- def test_managed_keep_source_false_salt(self, name):
- '''
- This test ensures that we properly clean the cached file if keep_source
- is set to False, for source files using a salt:// URL
- '''
- source = 'salt://grail/scene33'
- saltenv = 'base'
- # Run the state
- ret = self.run_state(
- 'file.managed',
- name=name,
- source=source,
- saltenv=saltenv,
- keep_source=False)
- ret = ret[next(iter(ret))]
- assert ret['result'] is True
- # Now make sure that the file is not cached
- result = self.run_function('cp.is_cached', [source, saltenv])
- assert result == '', 'File is still cached at {0}'.format(result)
- @with_tempfile(create=False)
- @with_tempfile(create=False)
- def test_file_managed_onchanges(self, file1, file2):
- '''
- Test file.managed state with onchanges
- '''
- pillar = {'file1': file1,
- 'file2': file2,
- 'source': 'salt://testfile',
- 'req': 'onchanges'}
- # Lay down the file used in the below SLS to ensure that when it is
- # run, there are no changes.
- self.run_state(
- 'file.managed',
- name=pillar['file2'],
- source=pillar['source'])
- ret = self.repack_state_returns(
- self.run_function(
- 'state.apply',
- mods='onchanges_prereq',
- pillar=pillar,
- test=True,
- )
- )
- # The file states should both exit with None
- assert ret['one']['result'] is None, ret['one']['result']
- assert ret['three']['result'] is True, ret['three']['result']
- # The first file state should have changes, since a new file was
- # created. The other one should not, since we already created that file
- # before applying the SLS file.
- assert ret['one']['changes']
- assert not ret['three']['changes'], ret['three']['changes']
- # The state watching 'one' should have been run due to changes
- assert ret['two']['comment'] == 'Success!', ret['two']['comment']
- # The state watching 'three' should not have been run
- assert ret['four']['comment'] == \
- 'State was not run because none of the onchanges reqs changed', \
- ret['four']['comment']
- @with_tempfile(create=False)
- @with_tempfile(create=False)
- def test_file_managed_prereq(self, file1, file2):
- '''
- Test file.managed state with prereq
- '''
- pillar = {'file1': file1,
- 'file2': file2,
- 'source': 'salt://testfile',
- 'req': 'prereq'}
- # Lay down the file used in the below SLS to ensure that when it is
- # run, there are no changes.
- self.run_state(
- 'file.managed',
- name=pillar['file2'],
- source=pillar['source'])
- ret = self.repack_state_returns(
- self.run_function(
- 'state.apply',
- mods='onchanges_prereq',
- pillar=pillar,
- test=True,
- )
- )
- # The file states should both exit with None
- assert ret['one']['result'] is None, ret['one']['result']
- assert ret['three']['result'] is True, ret['three']['result']
- # The first file state should have changes, since a new file was
- # created. The other one should not, since we already created that file
- # before applying the SLS file.
- assert ret['one']['changes']
- assert not ret['three']['changes'], ret['three']['changes']
- # The state watching 'one' should have been run due to changes
- assert ret['two']['comment'] == 'Success!', ret['two']['comment']
- # The state watching 'three' should not have been run
- assert ret['four']['comment'] == 'No changes detected', \
- ret['four']['comment']
- def test_directory(self):
- '''
- file.directory
- '''
- name = os.path.join(TMP, 'a_new_dir')
- ret = self.run_state('file.directory', name=name)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isdir(name))
- def test_directory_symlink_dry_run(self):
- '''
- Ensure that symlinks are followed when file.directory is run with
- test=True
- '''
- try:
- tmp_dir = os.path.join(TMP, 'pgdata')
- sym_dir = os.path.join(TMP, 'pg_data')
- if IS_WINDOWS:
- self.run_function('file.mkdir', [tmp_dir, 'Administrators'])
- else:
- os.mkdir(tmp_dir, 0o700)
- self.run_function('file.symlink', [tmp_dir, sym_dir])
- if IS_WINDOWS:
- ret = self.run_state(
- 'file.directory', test=True, name=sym_dir,
- follow_symlinks=True, win_owner='Administrators')
- else:
- ret = self.run_state(
- 'file.directory', test=True, name=sym_dir,
- follow_symlinks=True, mode=700)
- self.assertSaltTrueReturn(ret)
- finally:
- if os.path.isdir(tmp_dir):
- self.run_function('file.remove', [tmp_dir])
- if os.path.islink(sym_dir):
- self.run_function('file.remove', [sym_dir])
- @skip_if_not_root
- @skipIf(IS_WINDOWS, 'Mode not available in Windows')
- def test_directory_max_depth(self):
- '''
- file.directory
- Test the max_depth option by iteratively increasing the depth and
- checking that no changes deeper than max_depth have been attempted
- '''
- def _get_oct_mode(name):
- '''
- Return a string octal representation of the permissions for name
- '''
- return salt.utils.files.normalize_mode(oct(os.stat(name).st_mode & 0o777))
- top = os.path.join(TMP, 'top_dir')
- sub = os.path.join(top, 'sub_dir')
- subsub = os.path.join(sub, 'sub_sub_dir')
- dirs = [top, sub, subsub]
- initial_mode = '0111'
- changed_mode = '0555'
- initial_modes = {0: {sub: '0755',
- subsub: '0111'},
- 1: {sub: '0111',
- subsub: '0111'},
- 2: {sub: '0111',
- subsub: '0111'}}
- if not os.path.isdir(subsub):
- os.makedirs(subsub, int(initial_mode, 8))
- try:
- for depth in range(0, 3):
- ret = self.run_state('file.directory',
- name=top,
- max_depth=depth,
- dir_mode=changed_mode,
- recurse=['mode'])
- self.assertSaltTrueReturn(ret)
- for changed_dir in dirs[0:depth+1]:
- self.assertEqual(changed_mode,
- _get_oct_mode(changed_dir))
- for untouched_dir in dirs[depth+1:]:
- # Beginning in Python 3.7, os.makedirs no longer sets
- # the mode of intermediate directories to the mode that
- # is passed.
- if sys.version_info >= (3, 7):
- _mode = initial_modes[depth][untouched_dir]
- self.assertEqual(_mode,
- _get_oct_mode(untouched_dir))
- else:
- self.assertEqual(initial_mode,
- _get_oct_mode(untouched_dir))
- finally:
- shutil.rmtree(top)
- def test_test_directory(self):
- '''
- file.directory
- '''
- name = os.path.join(TMP, 'a_not_dir')
- ret = self.run_state('file.directory', test=True, name=name)
- self.assertSaltNoneReturn(ret)
- self.assertFalse(os.path.isdir(name))
- @with_tempdir()
- def test_directory_clean(self, base_dir):
- '''
- file.directory with clean=True
- '''
- name = os.path.join(base_dir, 'directory_clean_dir')
- os.mkdir(name)
- strayfile = os.path.join(name, 'strayfile')
- with salt.utils.files.fopen(strayfile, 'w'):
- pass
- straydir = os.path.join(name, 'straydir')
- if not os.path.isdir(straydir):
- os.makedirs(straydir)
- with salt.utils.files.fopen(os.path.join(straydir, 'strayfile2'), 'w'):
- pass
- ret = self.run_state('file.directory', name=name, clean=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(os.path.exists(strayfile))
- self.assertFalse(os.path.exists(straydir))
- self.assertTrue(os.path.isdir(name))
- def test_directory_is_idempotent(self):
- '''
- Ensure the file.directory state produces no changes when rerun.
- '''
- name = os.path.join(TMP, 'a_dir_twice')
- if IS_WINDOWS:
- username = os.environ.get('USERNAME', 'Administrators')
- domain = os.environ.get('USERDOMAIN', '')
- fullname = '{0}\\{1}'.format(domain, username)
- ret = self.run_state('file.directory', name=name, win_owner=fullname)
- else:
- ret = self.run_state('file.directory', name=name)
- self.assertSaltTrueReturn(ret)
- if IS_WINDOWS:
- ret = self.run_state('file.directory', name=name, win_owner=username)
- else:
- ret = self.run_state('file.directory', name=name)
- self.assertSaltTrueReturn(ret)
- self.assertSaltStateChangesEqual(ret, {})
- @with_tempdir()
- def test_directory_clean_exclude(self, base_dir):
- '''
- file.directory with clean=True and exclude_pat set
- '''
- name = os.path.join(base_dir, 'directory_clean_dir')
- if not os.path.isdir(name):
- os.makedirs(name)
- strayfile = os.path.join(name, 'strayfile')
- with salt.utils.files.fopen(strayfile, 'w'):
- pass
- straydir = os.path.join(name, 'straydir')
- if not os.path.isdir(straydir):
- os.makedirs(straydir)
- strayfile2 = os.path.join(straydir, 'strayfile2')
- with salt.utils.files.fopen(strayfile2, 'w'):
- pass
- keepfile = os.path.join(straydir, 'keepfile')
- with salt.utils.files.fopen(keepfile, 'w'):
- pass
- exclude_pat = 'E@^straydir(|/keepfile)$'
- if IS_WINDOWS:
- exclude_pat = 'E@^straydir(|\\\\keepfile)$'
- ret = self.run_state('file.directory',
- name=name,
- clean=True,
- exclude_pat=exclude_pat)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(os.path.exists(strayfile))
- self.assertFalse(os.path.exists(strayfile2))
- self.assertTrue(os.path.exists(keepfile))
- @skipIf(IS_WINDOWS, 'Skip on windows')
- @with_tempdir()
- def test_test_directory_clean_exclude(self, base_dir):
- '''
- file.directory with test=True, clean=True and exclude_pat set
- Skipped on windows because clean and exclude_pat not supported by
- salt.sates.file._check_directory_win
- '''
- name = os.path.join(base_dir, 'directory_clean_dir')
- os.mkdir(name)
- strayfile = os.path.join(name, 'strayfile')
- with salt.utils.files.fopen(strayfile, 'w'):
- pass
- straydir = os.path.join(name, 'straydir')
- if not os.path.isdir(straydir):
- os.makedirs(straydir)
- strayfile2 = os.path.join(straydir, 'strayfile2')
- with salt.utils.files.fopen(strayfile2, 'w'):
- pass
- keepfile = os.path.join(straydir, 'keepfile')
- with salt.utils.files.fopen(keepfile, 'w'):
- pass
- exclude_pat = 'E@^straydir(|/keepfile)$'
- if IS_WINDOWS:
- exclude_pat = 'E@^straydir(|\\\\keepfile)$'
- ret = self.run_state('file.directory',
- test=True,
- name=name,
- clean=True,
- exclude_pat=exclude_pat)
- comment = next(six.itervalues(ret))['comment']
- self.assertSaltNoneReturn(ret)
- self.assertTrue(os.path.exists(strayfile))
- self.assertTrue(os.path.exists(strayfile2))
- self.assertTrue(os.path.exists(keepfile))
- self.assertIn(strayfile, comment)
- self.assertIn(strayfile2, comment)
- self.assertNotIn(keepfile, comment)
- @with_tempdir()
- def test_directory_clean_require_in(self, name):
- '''
- file.directory test with clean=True and require_in file
- '''
- state_name = 'file-FileTest-test_directory_clean_require_in'
- state_filename = state_name + '.sls'
- state_file = os.path.join(BASE_FILES, state_filename)
- wrong_file = os.path.join(name, "wrong")
- with salt.utils.files.fopen(wrong_file, "w") as fp:
- fp.write("foo")
- good_file = os.path.join(name, "bar")
- with salt.utils.files.fopen(state_file, 'w') as fp:
- self.addCleanup(lambda: os.remove(state_file))
- fp.write(textwrap.dedent('''\
- some_dir:
- file.directory:
- - name: {name}
- - clean: true
- {good_file}:
- file.managed:
- - require_in:
- - file: some_dir
- '''.format(name=name, good_file=good_file)))
- ret = self.run_function('state.sls', [state_name])
- self.assertTrue(os.path.exists(good_file))
- self.assertFalse(os.path.exists(wrong_file))
- @with_tempdir()
- def test_directory_clean_require_in_with_id(self, name):
- '''
- file.directory test with clean=True and require_in file with an ID
- different from the file name
- '''
- state_name = 'file-FileTest-test_directory_clean_require_in_with_id'
- state_filename = state_name + '.sls'
- state_file = os.path.join(BASE_FILES, state_filename)
- wrong_file = os.path.join(name, "wrong")
- with salt.utils.files.fopen(wrong_file, "w") as fp:
- fp.write("foo")
- good_file = os.path.join(name, "bar")
- with salt.utils.files.fopen(state_file, 'w') as fp:
- self.addCleanup(lambda: os.remove(state_file))
- fp.write(textwrap.dedent('''\
- some_dir:
- file.directory:
- - name: {name}
- - clean: true
- some_file:
- file.managed:
- - name: {good_file}
- - require_in:
- - file: some_dir
- '''.format(name=name, good_file=good_file)))
- ret = self.run_function('state.sls', [state_name])
- self.assertTrue(os.path.exists(good_file))
- self.assertFalse(os.path.exists(wrong_file))
- @with_tempdir()
- def test_directory_clean_require_with_name(self, name):
- '''
- file.directory test with clean=True and require with a file state
- relatively to the state's name, not its ID.
- '''
- state_name = 'file-FileTest-test_directory_clean_require_in_with_id'
- state_filename = state_name + '.sls'
- state_file = os.path.join(BASE_FILES, state_filename)
- wrong_file = os.path.join(name, "wrong")
- with salt.utils.files.fopen(wrong_file, "w") as fp:
- fp.write("foo")
- good_file = os.path.join(name, "bar")
- with salt.utils.files.fopen(state_file, 'w') as fp:
- self.addCleanup(lambda: os.remove(state_file))
- fp.write(textwrap.dedent('''\
- some_dir:
- file.directory:
- - name: {name}
- - clean: true
- - require:
- # This requirement refers to the name of the following
- # state, not its ID.
- - file: {good_file}
- some_file:
- file.managed:
- - name: {good_file}
- '''.format(name=name, good_file=good_file)))
- ret = self.run_function('state.sls', [state_name])
- self.assertTrue(os.path.exists(good_file))
- self.assertFalse(os.path.exists(wrong_file))
- def test_directory_broken_symlink(self):
- '''
- Ensure that file.directory works even if a directory
- contains broken symbolic link
- '''
- try:
- tmp_dir = os.path.join(TMP, 'foo')
- null_file = '{0}/null'.format(tmp_dir)
- broken_link = '{0}/broken'.format(tmp_dir)
- if IS_WINDOWS:
- self.run_function('file.mkdir', [tmp_dir, 'Administrators'])
- else:
- os.mkdir(tmp_dir, 0o700)
- self.run_function('file.symlink', [null_file, broken_link])
- if IS_WINDOWS:
- ret = self.run_state(
- 'file.directory', name=tmp_dir, recurse={'mode'},
- follow_symlinks=True, win_owner='Administrators')
- else:
- ret = self.run_state(
- 'file.directory', name=tmp_dir, recurse={'mode'},
- file_mode=644, dir_mode=755)
- self.assertSaltTrueReturn(ret)
- finally:
- if os.path.isdir(tmp_dir):
- self.run_function('file.remove', [tmp_dir])
- @with_tempdir(create=False)
- def test_recurse(self, name):
- '''
- file.recurse
- '''
- ret = self.run_state('file.recurse', name=name, source='salt://grail')
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(os.path.join(name, '36', 'scene')))
- @with_tempdir(create=False)
- @with_tempdir(create=False)
- def test_recurse_specific_env(self, dir1, dir2):
- '''
- file.recurse passing __env__
- '''
- ret = self.run_state('file.recurse',
- name=dir1,
- source='salt://holy',
- __env__='prod')
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(os.path.join(dir1, '32', 'scene')))
- ret = self.run_state('file.recurse',
- name=dir2,
- source='salt://holy',
- saltenv='prod')
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(os.path.join(dir2, '32', 'scene')))
- @with_tempdir(create=False)
- @with_tempdir(create=False)
- def test_recurse_specific_env_in_url(self, dir1, dir2):
- '''
- file.recurse passing __env__
- '''
- ret = self.run_state('file.recurse',
- name=dir1,
- source='salt://holy?saltenv=prod')
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(os.path.join(dir1, '32', 'scene')))
- ret = self.run_state('file.recurse',
- name=dir2,
- source='salt://holy?saltenv=prod')
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(os.path.join(dir2, '32', 'scene')))
- @with_tempdir(create=False)
- def test_test_recurse(self, name):
- '''
- file.recurse test interface
- '''
- ret = self.run_state(
- 'file.recurse', test=True, name=name, source='salt://grail',
- )
- self.assertSaltNoneReturn(ret)
- self.assertFalse(os.path.isfile(os.path.join(name, '36', 'scene')))
- self.assertFalse(os.path.exists(name))
- @with_tempdir(create=False)
- @with_tempdir(create=False)
- def test_test_recurse_specific_env(self, dir1, dir2):
- '''
- file.recurse test interface
- '''
- ret = self.run_state('file.recurse',
- test=True,
- name=dir1,
- source='salt://holy',
- __env__='prod'
- )
- self.assertSaltNoneReturn(ret)
- self.assertFalse(os.path.isfile(os.path.join(dir1, '32', 'scene')))
- self.assertFalse(os.path.exists(dir1))
- ret = self.run_state('file.recurse',
- test=True,
- name=dir2,
- source='salt://holy',
- saltenv='prod'
- )
- self.assertSaltNoneReturn(ret)
- self.assertFalse(os.path.isfile(os.path.join(dir2, '32', 'scene')))
- self.assertFalse(os.path.exists(dir2))
- @with_tempdir(create=False)
- def test_recurse_template(self, name):
- '''
- file.recurse with jinja template enabled
- '''
- _ts = 'TEMPLATE TEST STRING'
- ret = self.run_state(
- 'file.recurse', name=name, source='salt://grail',
- template='jinja', defaults={'spam': _ts})
- self.assertSaltTrueReturn(ret)
- with salt.utils.files.fopen(os.path.join(name, 'scene33'), 'r') as fp_:
- contents = fp_.read()
- self.assertIn(_ts, contents)
- @with_tempdir()
- def test_recurse_clean(self, name):
- '''
- file.recurse with clean=True
- '''
- strayfile = os.path.join(name, 'strayfile')
- with salt.utils.files.fopen(strayfile, 'w'):
- pass
- # Corner cases: replacing file with a directory and vice versa
- with salt.utils.files.fopen(os.path.join(name, '36'), 'w'):
- pass
- os.makedirs(os.path.join(name, 'scene33'))
- ret = self.run_state(
- 'file.recurse', name=name, source='salt://grail', clean=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(os.path.exists(strayfile))
- self.assertTrue(os.path.isfile(os.path.join(name, '36', 'scene')))
- self.assertTrue(os.path.isfile(os.path.join(name, 'scene33')))
- @with_tempdir()
- def test_recurse_clean_specific_env(self, name):
- '''
- file.recurse with clean=True and __env__=prod
- '''
- strayfile = os.path.join(name, 'strayfile')
- with salt.utils.files.fopen(strayfile, 'w'):
- pass
- # Corner cases: replacing file with a directory and vice versa
- with salt.utils.files.fopen(os.path.join(name, '32'), 'w'):
- pass
- os.makedirs(os.path.join(name, 'scene34'))
- ret = self.run_state('file.recurse',
- name=name,
- source='salt://holy',
- clean=True,
- __env__='prod')
- self.assertSaltTrueReturn(ret)
- self.assertFalse(os.path.exists(strayfile))
- self.assertTrue(os.path.isfile(os.path.join(name, '32', 'scene')))
- self.assertTrue(os.path.isfile(os.path.join(name, 'scene34')))
- @skipIf(IS_WINDOWS, 'Skip on windows')
- @with_tempdir()
- def test_recurse_issue_34945(self, base_dir):
- '''
- This tests the case where the source dir for the file.recurse state
- does not contain any files (only subdirectories), and the dir_mode is
- being managed. For a long time, this corner case resulted in the top
- level of the destination directory being created with the wrong initial
- permissions, a problem that would be corrected later on in the
- file.recurse state via running state.directory. However, the
- file.directory state only gets called when there are files to be
- managed in that directory, and when the source directory contains only
- subdirectories, the incorrectly-set initial perms would not be
- repaired.
- This was fixed in https://github.com/saltstack/salt/pull/35309
- Skipped on windows because dir_mode is not supported.
- '''
- dir_mode = '2775'
- issue_dir = 'issue-34945'
- name = os.path.join(base_dir, issue_dir)
- ret = self.run_state('file.recurse',
- name=name,
- source='salt://' + issue_dir,
- dir_mode=dir_mode)
- self.assertSaltTrueReturn(ret)
- actual_dir_mode = oct(stat.S_IMODE(os.stat(name).st_mode))[-4:]
- self.assertEqual(dir_mode, actual_dir_mode)
- @with_tempdir(create=False)
- def test_recurse_issue_40578(self, name):
- '''
- This ensures that the state doesn't raise an exception when it
- encounters a file with a unicode filename in the process of invoking
- file.source_list.
- '''
- ret = self.run_state('file.recurse',
- name=name,
- source='salt://соль')
- self.assertSaltTrueReturn(ret)
- if six.PY2 and IS_WINDOWS:
- # Providing unicode to os.listdir so that we avoid having listdir
- # try to decode the filenames using the systemencoding on windows
- # python 2.
- files = os.listdir(name.decode('utf-8'))
- else:
- files = salt.utils.data.decode(os.listdir(name), normalize=True)
- self.assertEqual(
- sorted(files),
- sorted(['foo.txt', 'спам.txt', 'яйца.txt']),
- )
- @with_tempfile()
- def test_replace(self, name):
- '''
- file.replace
- '''
- with salt.utils.files.fopen(name, 'w+') as fp_:
- fp_.write('change_me')
- ret = self.run_state('file.replace',
- name=name, pattern='change', repl='salt', backup=False)
- with salt.utils.files.fopen(name, 'r') as fp_:
- self.assertIn('salt', fp_.read())
- self.assertSaltTrueReturn(ret)
- @with_tempdir()
- def test_replace_issue_18612(self, base_dir):
- '''
- Test the (mis-)behaviour of file.replace as described in #18612:
- Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
- an infinitely growing file as 'file.replace' didn't check beforehand
- whether the changes had already been done to the file
- # Case description:
- The tested file contains one commented line
- The commented line should be uncommented in the end, nothing else should change
- '''
- test_name = 'test_replace_issue_18612'
- path_test = os.path.join(base_dir, test_name)
- with salt.utils.files.fopen(path_test, 'w+') as fp_test_:
- fp_test_.write('# en_US.UTF-8')
- ret = []
- for x in range(0, 3):
- ret.append(self.run_state('file.replace',
- name=path_test, pattern='^# en_US.UTF-8$', repl='en_US.UTF-8', append_if_not_found=True))
- # ensure, the number of lines didn't change, even after invoking 'file.replace' 3 times
- with salt.utils.files.fopen(path_test, 'r') as fp_test_:
- self.assertTrue((sum(1 for _ in fp_test_) == 1))
- # ensure, the replacement succeeded
- with salt.utils.files.fopen(path_test, 'r') as fp_test_:
- self.assertTrue(fp_test_.read().startswith('en_US.UTF-8'))
- # ensure, all runs of 'file.replace' reported success
- for item in ret:
- self.assertSaltTrueReturn(item)
- @with_tempdir()
- def test_replace_issue_18612_prepend(self, base_dir):
- '''
- Test the (mis-)behaviour of file.replace as described in #18612:
- Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
- an infinitely growing file as 'file.replace' didn't check beforehand
- whether the changes had already been done to the file
- # Case description:
- The tested multifile contains multiple lines not matching the pattern or replacement in any way
- The replacement pattern should be prepended to the file
- '''
- test_name = 'test_replace_issue_18612_prepend'
- path_in = os.path.join(
- FILES, 'file.replace', '{0}.in'.format(test_name)
- )
- path_out = os.path.join(
- FILES, 'file.replace', '{0}.out'.format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- ret = []
- for x in range(0, 3):
- ret.append(self.run_state('file.replace',
- name=path_test, pattern='^# en_US.UTF-8$', repl='en_US.UTF-8', prepend_if_not_found=True))
- # ensure, the resulting file contains the expected lines
- self.assertTrue(filecmp.cmp(path_test, path_out))
- # ensure the initial file was properly backed up
- self.assertTrue(filecmp.cmp(path_test + '.bak', path_in))
- # ensure, all runs of 'file.replace' reported success
- for item in ret:
- self.assertSaltTrueReturn(item)
- @with_tempdir()
- def test_replace_issue_18612_append(self, base_dir):
- '''
- Test the (mis-)behaviour of file.replace as described in #18612:
- Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
- an infinitely growing file as 'file.replace' didn't check beforehand
- whether the changes had already been done to the file
- # Case description:
- The tested multifile contains multiple lines not matching the pattern or replacement in any way
- The replacement pattern should be appended to the file
- '''
- test_name = 'test_replace_issue_18612_append'
- path_in = os.path.join(
- FILES, 'file.replace', '{0}.in'.format(test_name)
- )
- path_out = os.path.join(
- FILES, 'file.replace', '{0}.out'.format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- ret = []
- for x in range(0, 3):
- ret.append(self.run_state('file.replace',
- name=path_test, pattern='^# en_US.UTF-8$', repl='en_US.UTF-8', append_if_not_found=True))
- # ensure, the resulting file contains the expected lines
- self.assertTrue(filecmp.cmp(path_test, path_out))
- # ensure the initial file was properly backed up
- self.assertTrue(filecmp.cmp(path_test + '.bak', path_in))
- # ensure, all runs of 'file.replace' reported success
- for item in ret:
- self.assertSaltTrueReturn(item)
- @with_tempdir()
- def test_replace_issue_18612_append_not_found_content(self, base_dir):
- '''
- Test the (mis-)behaviour of file.replace as described in #18612:
- Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
- an infinitely growing file as 'file.replace' didn't check beforehand
- whether the changes had already been done to the file
- # Case description:
- The tested multifile contains multiple lines not matching the pattern or replacement in any way
- The 'not_found_content' value should be appended to the file
- '''
- test_name = 'test_replace_issue_18612_append_not_found_content'
- path_in = os.path.join(
- FILES, 'file.replace', '{0}.in'.format(test_name)
- )
- path_out = os.path.join(
- FILES, 'file.replace', '{0}.out'.format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- ret = []
- for x in range(0, 3):
- ret.append(
- self.run_state('file.replace',
- name=path_test,
- pattern='^# en_US.UTF-8$',
- repl='en_US.UTF-8',
- append_if_not_found=True,
- not_found_content='THIS LINE WASN\'T FOUND! SO WE\'RE APPENDING IT HERE!'
- ))
- # ensure, the resulting file contains the expected lines
- self.assertTrue(filecmp.cmp(path_test, path_out))
- # ensure the initial file was properly backed up
- self.assertTrue(filecmp.cmp(path_test + '.bak', path_in))
- # ensure, all runs of 'file.replace' reported success
- for item in ret:
- self.assertSaltTrueReturn(item)
- @with_tempdir()
- def test_replace_issue_18612_change_mid_line_with_comment(self, base_dir):
- '''
- Test the (mis-)behaviour of file.replace as described in #18612:
- Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
- an infinitely growing file as 'file.replace' didn't check beforehand
- whether the changes had already been done to the file
- # Case description:
- The tested file contains 5 key=value pairs
- The commented key=value pair #foo=bar should be changed to foo=salt
- The comment char (#) in front of foo=bar should be removed
- '''
- test_name = 'test_replace_issue_18612_change_mid_line_with_comment'
- path_in = os.path.join(
- FILES, 'file.replace', '{0}.in'.format(test_name)
- )
- path_out = os.path.join(
- FILES, 'file.replace', '{0}.out'.format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- ret = []
- for x in range(0, 3):
- ret.append(self.run_state('file.replace',
- name=path_test, pattern='^#foo=bar($|(?=\r\n))', repl='foo=salt', append_if_not_found=True))
- # ensure, the resulting file contains the expected lines
- self.assertTrue(filecmp.cmp(path_test, path_out))
- # ensure the initial file was properly backed up
- self.assertTrue(filecmp.cmp(path_test + '.bak', path_in))
- # ensure, all 'file.replace' runs reported success
- for item in ret:
- self.assertSaltTrueReturn(item)
- @with_tempdir()
- def test_replace_issue_18841_no_changes(self, base_dir):
- '''
- Test the (mis-)behaviour of file.replace as described in #18841:
- Using file.replace in a way which shouldn't modify the file at all
- results in changed mtime of the original file and a backup file being created.
- # Case description
- The tested file contains multiple lines
- The tested file contains a line already matching the replacement (no change needed)
- The tested file's content shouldn't change at all
- The tested file's mtime shouldn't change at all
- No backup file should be created
- '''
- test_name = 'test_replace_issue_18841_no_changes'
- path_in = os.path.join(
- FILES, 'file.replace', '{0}.in'.format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- # get (m|a)time of file
- fstats_orig = os.stat(path_test)
- # define how far we predate the file
- age = 5*24*60*60
- # set (m|a)time of file 5 days into the past
- os.utime(path_test, (fstats_orig.st_mtime-age, fstats_orig.st_atime-age))
- ret = self.run_state('file.replace',
- name=path_test,
- pattern='^hello world$',
- repl='goodbye world',
- show_changes=True,
- flags=['IGNORECASE'],
- backup=False
- )
- # get (m|a)time of file
- fstats_post = os.stat(path_test)
- # ensure, the file content didn't change
- self.assertTrue(filecmp.cmp(path_in, path_test))
- # ensure no backup file was created
- self.assertFalse(os.path.exists(path_test + '.bak'))
- # ensure the file's mtime didn't change
- self.assertTrue(fstats_post.st_mtime, fstats_orig.st_mtime-age)
- # ensure, all 'file.replace' runs reported success
- self.assertSaltTrueReturn(ret)
- def test_serialize(self):
- '''
- Test to ensure that file.serialize returns a data structure that's
- both serialized and formatted properly
- '''
- path_test = os.path.join(TMP, 'test_serialize')
- ret = self.run_state('file.serialize',
- name=path_test,
- dataset={'name': 'naive',
- 'description': 'A basic test',
- 'a_list': ['first_element', 'second_element'],
- 'finally': 'the last item'},
- formatter='json')
- with salt.utils.files.fopen(path_test, 'rb') as fp_:
- serialized_file = salt.utils.stringutils.to_unicode(fp_.read())
- # The JSON serializer uses LF even on OSes where os.path.sep is CRLF.
- expected_file = '\n'.join([
- '{',
- ' "a_list": [',
- ' "first_element",',
- ' "second_element"',
- ' ],',
- ' "description": "A basic test",',
- ' "finally": "the last item",',
- ' "name": "naive"',
- '}',
- '',
- ])
- self.assertEqual(serialized_file, expected_file)
- @with_tempfile(create=False)
- def test_serializer_deserializer_opts(self, name):
- '''
- Test the serializer_opts and deserializer_opts options
- '''
- data1 = {'foo': {'bar': '%(x)s'}}
- data2 = {'foo': {'abc': 123}}
- merged = {'foo': {'y': 'not_used', 'x': 'baz', 'abc': 123, 'bar': u'baz'}}
- ret = self.run_state(
- 'file.serialize',
- name=name,
- dataset=data1,
- formatter='configparser',
- deserializer_opts=[{'defaults': {'y': 'not_used'}}])
- ret = ret[next(iter(ret))]
- assert ret['result'], ret
- # We should have warned about deserializer_opts being used when
- # merge_if_exists was not set to True.
- assert 'warnings' in ret
- # Run with merge_if_exists, as well as serializer and deserializer opts
- # deserializer opts will be used for string interpolation of the %(x)s
- # that was written to the file with data1 (i.e. bar should become baz)
- ret = self.run_state(
- 'file.serialize',
- name=name,
- dataset=data2,
- formatter='configparser',
- merge_if_exists=True,
- serializer_opts=[{'defaults': {'y': 'not_used'}}],
- deserializer_opts=[{'defaults': {'x': 'baz'}}])
- ret = ret[next(iter(ret))]
- assert ret['result'], ret
- with salt.utils.files.fopen(name) as fp_:
- serialized_data = salt.serializers.configparser.deserialize(fp_)
- # If this test fails, this debug logging will help tell us how the
- # serialized data differs from what was serialized.
- log.debug('serialized_data = %r', serialized_data)
- log.debug('merged = %r', merged)
- # serializing with a default of 'y' will add y = not_used into foo
- assert serialized_data['foo']['y'] == merged['foo']['y']
- # deserializing with default of x = baz will perform interpolation on %(x)s
- # and bar will then = baz
- assert serialized_data['foo']['bar'] == merged['foo']['bar']
- @with_tempdir()
- def test_replace_issue_18841_omit_backup(self, base_dir):
- '''
- Test the (mis-)behaviour of file.replace as described in #18841:
- Using file.replace in a way which shouldn't modify the file at all
- results in changed mtime of the original file and a backup file being created.
- # Case description
- The tested file contains multiple lines
- The tested file contains a line already matching the replacement (no change needed)
- The tested file's content shouldn't change at all
- The tested file's mtime shouldn't change at all
- No backup file should be created, although backup=False isn't explicitly defined
- '''
- test_name = 'test_replace_issue_18841_omit_backup'
- path_in = os.path.join(
- FILES, 'file.replace', '{0}.in'.format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- # get (m|a)time of file
- fstats_orig = os.stat(path_test)
- # define how far we predate the file
- age = 5*24*60*60
- # set (m|a)time of file 5 days into the past
- os.utime(path_test, (fstats_orig.st_mtime-age, fstats_orig.st_atime-age))
- ret = self.run_state('file.replace',
- name=path_test,
- pattern='^hello world$',
- repl='goodbye world',
- show_changes=True,
- flags=['IGNORECASE']
- )
- # get (m|a)time of file
- fstats_post = os.stat(path_test)
- # ensure, the file content didn't change
- self.assertTrue(filecmp.cmp(path_in, path_test))
- # ensure no backup file was created
- self.assertFalse(os.path.exists(path_test + '.bak'))
- # ensure the file's mtime didn't change
- self.assertTrue(fstats_post.st_mtime, fstats_orig.st_mtime-age)
- # ensure, all 'file.replace' runs reported success
- self.assertSaltTrueReturn(ret)
- @with_tempfile()
- def test_comment(self, name):
- '''
- file.comment
- '''
- # write a line to file
- with salt.utils.files.fopen(name, 'w+') as fp_:
- fp_.write('comment_me')
- # Look for changes with test=True: return should be "None" at the first run
- ret = self.run_state('file.comment', test=True, name=name, regex='^comment')
- self.assertSaltNoneReturn(ret)
- # comment once
- ret = self.run_state('file.comment', name=name, regex='^comment')
- # result is positive
- self.assertSaltTrueReturn(ret)
- # line is commented
- with salt.utils.files.fopen(name, 'r') as fp_:
- self.assertTrue(fp_.read().startswith('#comment'))
- # comment twice
- ret = self.run_state('file.comment', name=name, regex='^comment')
- # result is still positive
- self.assertSaltTrueReturn(ret)
- # line is still commented
- with salt.utils.files.fopen(name, 'r') as fp_:
- self.assertTrue(fp_.read().startswith('#comment'))
- # Test previously commented file returns "True" now and not "None" with test=True
- ret = self.run_state('file.comment', test=True, name=name, regex='^comment')
- self.assertSaltTrueReturn(ret)
- @with_tempfile()
- def test_test_comment(self, name):
- '''
- file.comment test interface
- '''
- with salt.utils.files.fopen(name, 'w+') as fp_:
- fp_.write('comment_me')
- ret = self.run_state(
- 'file.comment', test=True, name=name, regex='.*comment.*',
- )
- with salt.utils.files.fopen(name, 'r') as fp_:
- self.assertNotIn('#comment', fp_.read())
- self.assertSaltNoneReturn(ret)
- @with_tempfile()
- def test_uncomment(self, name):
- '''
- file.uncomment
- '''
- with salt.utils.files.fopen(name, 'w+') as fp_:
- fp_.write('#comment_me')
- ret = self.run_state('file.uncomment', name=name, regex='^comment')
- with salt.utils.files.fopen(name, 'r') as fp_:
- self.assertNotIn('#comment', fp_.read())
- self.assertSaltTrueReturn(ret)
- @with_tempfile()
- def test_test_uncomment(self, name):
- '''
- file.comment test interface
- '''
- with salt.utils.files.fopen(name, 'w+') as fp_:
- fp_.write('#comment_me')
- ret = self.run_state(
- 'file.uncomment', test=True, name=name, regex='^comment.*'
- )
- with salt.utils.files.fopen(name, 'r') as fp_:
- self.assertIn('#comment', fp_.read())
- self.assertSaltNoneReturn(ret)
- @with_tempfile()
- def test_append(self, name):
- '''
- file.append
- '''
- with salt.utils.files.fopen(name, 'w+') as fp_:
- fp_.write('#salty!')
- ret = self.run_state('file.append', name=name, text='cheese')
- with salt.utils.files.fopen(name, 'r') as fp_:
- self.assertIn('cheese', fp_.read())
- self.assertSaltTrueReturn(ret)
- @with_tempfile()
- def test_test_append(self, name):
- '''
- file.append test interface
- '''
- with salt.utils.files.fopen(name, 'w+') as fp_:
- fp_.write('#salty!')
- ret = self.run_state(
- 'file.append', test=True, name=name, text='cheese'
- )
- with salt.utils.files.fopen(name, 'r') as fp_:
- self.assertNotIn('cheese', fp_.read())
- self.assertSaltNoneReturn(ret)
- @with_tempdir()
- def test_append_issue_1864_makedirs(self, base_dir):
- '''
- file.append but create directories if needed as an option, and create
- the file if it doesn't exist
- '''
- fname = 'append_issue_1864_makedirs'
- name = os.path.join(base_dir, fname)
- # Non existing file get's touched
- ret = self.run_state(
- 'file.append', name=name, text='cheese', makedirs=True
- )
- self.assertSaltTrueReturn(ret)
- # Nested directory and file get's touched
- name = os.path.join(base_dir, 'issue_1864', fname)
- ret = self.run_state(
- 'file.append', name=name, text='cheese', makedirs=True
- )
- self.assertSaltTrueReturn(ret)
- # Parent directory exists but file does not and makedirs is False
- name = os.path.join(base_dir, 'issue_1864', fname + '2')
- ret = self.run_state(
- 'file.append', name=name, text='cheese'
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(name))
- @with_tempdir()
- def test_prepend_issue_27401_makedirs(self, base_dir):
- '''
- file.prepend but create directories if needed as an option, and create
- the file if it doesn't exist
- '''
- fname = 'prepend_issue_27401'
- name = os.path.join(base_dir, fname)
- # Non existing file get's touched
- ret = self.run_state(
- 'file.prepend', name=name, text='cheese', makedirs=True
- )
- self.assertSaltTrueReturn(ret)
- # Nested directory and file get's touched
- name = os.path.join(base_dir, 'issue_27401', fname)
- ret = self.run_state(
- 'file.prepend', name=name, text='cheese', makedirs=True
- )
- self.assertSaltTrueReturn(ret)
- # Parent directory exists but file does not and makedirs is False
- name = os.path.join(base_dir, 'issue_27401', fname + '2')
- ret = self.run_state(
- 'file.prepend', name=name, text='cheese'
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(name))
- @with_tempfile()
- def test_touch(self, name):
- '''
- file.touch
- '''
- ret = self.run_state('file.touch', name=name)
- self.assertTrue(os.path.isfile(name))
- self.assertSaltTrueReturn(ret)
- @with_tempfile(create=False)
- def test_test_touch(self, name):
- '''
- file.touch test interface
- '''
- ret = self.run_state('file.touch', test=True, name=name)
- self.assertFalse(os.path.isfile(name))
- self.assertSaltNoneReturn(ret)
- @with_tempdir()
- def test_touch_directory(self, base_dir):
- '''
- file.touch a directory
- '''
- name = os.path.join(base_dir, 'touch_test_dir')
- os.mkdir(name)
- ret = self.run_state('file.touch', name=name)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isdir(name))
- @with_tempdir()
- def test_issue_2227_file_append(self, base_dir):
- '''
- Text to append includes a percent symbol
- '''
- # let's make use of existing state to create a file with contents to
- # test against
- tmp_file_append = os.path.join(base_dir, 'test.append')
- self.run_state('file.touch', name=tmp_file_append)
- self.run_state(
- 'file.append',
- name=tmp_file_append,
- source='salt://testappend/firstif')
- self.run_state(
- 'file.append',
- name=tmp_file_append,
- source='salt://testappend/secondif')
- # Now our real test
- try:
- ret = self.run_state(
- 'file.append',
- name=tmp_file_append,
- text="HISTTIMEFORMAT='%F %T '")
- self.assertSaltTrueReturn(ret)
- with salt.utils.files.fopen(tmp_file_append, 'r') as fp_:
- contents_pre = fp_.read()
- # It should not append text again
- ret = self.run_state(
- 'file.append',
- name=tmp_file_append,
- text="HISTTIMEFORMAT='%F %T '")
- self.assertSaltTrueReturn(ret)
- with salt.utils.files.fopen(tmp_file_append, 'r') as fp_:
- contents_post = fp_.read()
- self.assertEqual(contents_pre, contents_post)
- except AssertionError:
- if os.path.exists(tmp_file_append):
- shutil.copy(tmp_file_append, tmp_file_append + '.bak')
- raise
- @with_tempdir()
- def test_issue_2401_file_comment(self, base_dir):
- # Get a path to the temporary file
- tmp_file = os.path.join(base_dir, 'issue-2041-comment.txt')
- # Write some data to it
- with salt.utils.files.fopen(tmp_file, 'w') as fp_:
- fp_.write('hello\nworld\n')
- # create the sls template
- template_lines = [
- '{0}:'.format(tmp_file),
- ' file.comment:',
- ' - regex: ^world'
- ]
- template = '\n'.join(template_lines)
- try:
- ret = self.run_function(
- 'state.template_str', [template], timeout=120
- )
- self.assertSaltTrueReturn(ret)
- self.assertNotInSaltComment('Pattern already commented', ret)
- self.assertInSaltComment('Commented lines successfully', ret)
- # This next time, it is already commented.
- ret = self.run_function(
- 'state.template_str', [template], timeout=120
- )
- self.assertSaltTrueReturn(ret)
- self.assertInSaltComment('Pattern already commented', ret)
- except AssertionError:
- shutil.copy(tmp_file, tmp_file + '.bak')
- raise
- @with_tempdir()
- def test_issue_2379_file_append(self, base_dir):
- # Get a path to the temporary file
- tmp_file = os.path.join(base_dir, 'issue-2379-file-append.txt')
- # Write some data to it
- with salt.utils.files.fopen(tmp_file, 'w') as fp_:
- fp_.write(
- 'hello\nworld\n' # Some junk
- '#PermitRootLogin yes\n' # Commented text
- '# PermitRootLogin yes\n' # Commented text with space
- )
- # create the sls template
- template_lines = [
- '{0}:'.format(tmp_file),
- ' file.append:',
- ' - text: PermitRootLogin yes'
- ]
- template = '\n'.join(template_lines)
- try:
- ret = self.run_function('state.template_str', [template])
- self.assertSaltTrueReturn(ret)
- self.assertInSaltComment('Appended 1 lines', ret)
- except AssertionError:
- shutil.copy(tmp_file, tmp_file + '.bak')
- raise
- @skipIf(IS_WINDOWS, 'Mode not available in Windows')
- @with_tempdir(create=False)
- @with_tempdir(create=False)
- def test_issue_2726_mode_kwarg(self, dir1, dir2):
- # Let's test for the wrong usage approach
- bad_mode_kwarg_testfile = os.path.join(
- dir1, 'bad_mode_kwarg', 'testfile'
- )
- bad_template = [
- '{0}:'.format(bad_mode_kwarg_testfile),
- ' file.recurse:',
- ' - source: salt://testfile',
- ' - mode: 644'
- ]
- ret = self.run_function(
- 'state.template_str', [os.linesep.join(bad_template)]
- )
- self.assertSaltFalseReturn(ret)
- self.assertInSaltComment(
- '\'mode\' is not allowed in \'file.recurse\'. Please use '
- '\'file_mode\' and \'dir_mode\'.',
- ret
- )
- self.assertNotInSaltComment(
- 'TypeError: managed() got multiple values for keyword '
- 'argument \'mode\'',
- ret
- )
- # Now, the correct usage approach
- good_mode_kwargs_testfile = os.path.join(
- dir2, 'good_mode_kwargs', 'testappend'
- )
- good_template = [
- '{0}:'.format(good_mode_kwargs_testfile),
- ' file.recurse:',
- ' - source: salt://testappend',
- ' - dir_mode: 744',
- ' - file_mode: 644',
- ]
- ret = self.run_function(
- 'state.template_str', [os.linesep.join(good_template)]
- )
- self.assertSaltTrueReturn(ret)
- @with_tempdir()
- def test_issue_8343_accumulated_require_in(self, base_dir):
- template_path = os.path.join(TMP_STATE_TREE, 'issue-8343.sls')
- testcase_filedest = os.path.join(base_dir, 'issue-8343.txt')
- if os.path.exists(template_path):
- os.remove(template_path)
- if os.path.exists(testcase_filedest):
- os.remove(testcase_filedest)
- sls_template = [
- '{0}:',
- ' file.managed:',
- ' - contents: |',
- ' #',
- '',
- 'prepend-foo-accumulator-from-pillar:',
- ' file.accumulated:',
- ' - require_in:',
- ' - file: prepend-foo-management',
- ' - filename: {0}',
- ' - text: |',
- ' foo',
- '',
- 'append-foo-accumulator-from-pillar:',
- ' file.accumulated:',
- ' - require_in:',
- ' - file: append-foo-management',
- ' - filename: {0}',
- ' - text: |',
- ' bar',
- '',
- 'prepend-foo-management:',
- ' file.blockreplace:',
- ' - name: {0}',
- ' - marker_start: "#-- start salt managed zonestart -- PLEASE, DO NOT EDIT"',
- ' - marker_end: "#-- end salt managed zonestart --"',
- " - content: ''",
- ' - prepend_if_not_found: True',
- " - backup: '.bak'",
- ' - show_changes: True',
- '',
- 'append-foo-management:',
- ' file.blockreplace:',
- ' - name: {0}',
- ' - marker_start: "#-- start salt managed zoneend -- PLEASE, DO NOT EDIT"',
- ' - marker_end: "#-- end salt managed zoneend --"',
- " - content: ''",
- ' - append_if_not_found: True',
- " - backup: '.bak2'",
- ' - show_changes: True',
- '']
- with salt.utils.files.fopen(template_path, 'w') as fp_:
- fp_.write(
- os.linesep.join(sls_template).format(testcase_filedest))
- ret = self.run_function('state.sls', mods='issue-8343')
- for name, step in six.iteritems(ret):
- self.assertSaltTrueReturn({name: step})
- with salt.utils.files.fopen(testcase_filedest) as fp_:
- contents = fp_.read().split(os.linesep)
- expected = [
- '#-- start salt managed zonestart -- PLEASE, DO NOT EDIT',
- 'foo',
- '#-- end salt managed zonestart --',
- '#',
- '#-- start salt managed zoneend -- PLEASE, DO NOT EDIT',
- 'bar',
- '#-- end salt managed zoneend --',
- '']
- self.assertEqual([salt.utils.stringutils.to_str(line) for line in expected], contents)
- @with_tempdir()
- def test_issue_11003_immutable_lazy_proxy_sum(self, base_dir):
- # causes the Import-Module ServerManager error on Windows
- template_path = os.path.join(TMP_STATE_TREE, 'issue-11003.sls')
- testcase_filedest = os.path.join(base_dir, 'issue-11003.txt')
- sls_template = [
- 'a{0}:',
- ' file.absent:',
- ' - name: {0}',
- '',
- '{0}:',
- ' file.managed:',
- ' - contents: |',
- ' #',
- '',
- 'test-acc1:',
- ' file.accumulated:',
- ' - require_in:',
- ' - file: final',
- ' - filename: {0}',
- ' - text: |',
- ' bar',
- '',
- 'test-acc2:',
- ' file.accumulated:',
- ' - watch_in:',
- ' - file: final',
- ' - filename: {0}',
- ' - text: |',
- ' baz',
- '',
- 'final:',
- ' file.blockreplace:',
- ' - name: {0}',
- ' - marker_start: "#-- start managed zone PLEASE, DO NOT EDIT"',
- ' - marker_end: "#-- end managed zone"',
- ' - content: \'\'',
- ' - append_if_not_found: True',
- ' - show_changes: True'
- ]
- with salt.utils.files.fopen(template_path, 'w') as fp_:
- fp_.write(os.linesep.join(sls_template).format(testcase_filedest))
- ret = self.run_function('state.sls', mods='issue-11003')
- for name, step in six.iteritems(ret):
- self.assertSaltTrueReturn({name: step})
- with salt.utils.files.fopen(testcase_filedest) as fp_:
- contents = fp_.read().split(os.linesep)
- begin = contents.index(
- '#-- start managed zone PLEASE, DO NOT EDIT') + 1
- end = contents.index('#-- end managed zone')
- block_contents = contents[begin:end]
- for item in ('', 'bar', 'baz'):
- block_contents.remove(item)
- self.assertEqual(block_contents, [])
- @with_tempdir()
- def test_issue_8947_utf8_sls(self, base_dir):
- '''
- Test some file operation with utf-8 characters on the sls
- This is more generic than just a file test. Feel free to move
- '''
- self.maxDiff = None
- korean_1 = '한국어 시험'
- korean_2 = '첫 번째 행'
- korean_3 = '마지막 행'
- test_file = os.path.join(base_dir, '{0}.txt'.format(korean_1))
- test_file_encoded = test_file
- template_path = os.path.join(TMP_STATE_TREE, 'issue-8947.sls')
- # create the sls template
- template = textwrap.dedent('''\
- some-utf8-file-create:
- file.managed:
- - name: {test_file}
- - contents: {korean_1}
- - makedirs: True
- - replace: True
- - show_diff: True
- some-utf8-file-create2:
- file.managed:
- - name: {test_file}
- - contents: |
- {korean_2}
- {korean_1}
- {korean_3}
- - replace: True
- - show_diff: True
- '''.format(**locals()))
- if not salt.utils.platform.is_windows():
- template += textwrap.dedent('''\
- some-utf8-file-content-test:
- cmd.run:
- - name: 'cat "{test_file}"'
- - require:
- - file: some-utf8-file-create2
- '''.format(**locals()))
- # Save template file
- with salt.utils.files.fopen(template_path, 'wb') as fp_:
- fp_.write(salt.utils.stringutils.to_bytes(template))
- try:
- result = self.run_function('state.sls', mods='issue-8947')
- if not isinstance(result, dict):
- raise AssertionError(
- ('Something went really wrong while testing this sls:'
- ' {0}').format(repr(result))
- )
- # difflib produces different output on python 2.6 than on >=2.7
- if sys.version_info < (2, 7):
- diff = '--- \n+++ \n@@ -1,1 +1,3 @@\n'
- else:
- diff = '--- \n+++ \n@@ -1 +1,3 @@\n'
- diff += (
- '+첫 번째 행{0}'
- ' 한국어 시험{0}'
- '+마지막 행{0}'
- ).format(os.linesep)
- ret = {x.split('_|-')[1]: y for x, y in six.iteritems(result)}
- # Confirm initial creation of file
- self.assertEqual(
- ret['some-utf8-file-create']['comment'],
- 'File {0} updated'.format(test_file_encoded)
- )
- self.assertEqual(
- ret['some-utf8-file-create']['changes'],
- {'diff': 'New file'}
- )
- # Confirm file was modified and that the diff was as expected
- self.assertEqual(
- ret['some-utf8-file-create2']['comment'],
- 'File {0} updated'.format(test_file_encoded)
- )
- self.assertEqual(
- ret['some-utf8-file-create2']['changes'],
- {'diff': diff}
- )
- if salt.utils.platform.is_windows():
- import subprocess
- import win32api
- p = subprocess.Popen(
- salt.utils.stringutils.to_str(
- 'type {}'.format(win32api.GetShortPathName(test_file))),
- shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- p.poll()
- out = p.stdout.read()
- self.assertEqual(
- out.decode('utf-8'),
- os.linesep.join((korean_2, korean_1, korean_3)) + os.linesep
- )
- else:
- self.assertEqual(
- ret['some-utf8-file-content-test']['comment'],
- 'Command "cat "{0}"" run'.format(
- test_file_encoded
- )
- )
- self.assertEqual(
- ret['some-utf8-file-content-test']['changes']['stdout'],
- '\n'.join((korean_2, korean_1, korean_3))
- )
- finally:
- try:
- os.remove(template_path)
- except OSError:
- pass
- @skip_if_not_root
- @skipIf(not HAS_PWD, "pwd not available. Skipping test")
- @skipIf(not HAS_GRP, "grp not available. Skipping test")
- @with_system_user_and_group('user12209', 'group12209',
- on_existing='delete', delete=True)
- @with_tempdir()
- def test_issue_12209_follow_symlinks(self, tempdir, user, group):
- '''
- Ensure that symlinks are properly chowned when recursing (following
- symlinks)
- '''
- # Make the directories for this test
- onedir = os.path.join(tempdir, 'one')
- twodir = os.path.join(tempdir, 'two')
- os.mkdir(onedir)
- os.symlink(onedir, twodir)
- # Run the state
- ret = self.run_state(
- 'file.directory', name=tempdir, follow_symlinks=True,
- user=user, group=group, recurse=['user', 'group']
- )
- self.assertSaltTrueReturn(ret)
- # Double-check, in case state mis-reported a True result. Since we are
- # following symlinks, we expect twodir to still be owned by root, but
- # onedir should be owned by the 'issue12209' user.
- onestats = os.stat(onedir)
- twostats = os.lstat(twodir)
- self.assertEqual(pwd.getpwuid(onestats.st_uid).pw_name, user)
- self.assertEqual(pwd.getpwuid(twostats.st_uid).pw_name, 'root')
- self.assertEqual(grp.getgrgid(onestats.st_gid).gr_name, group)
- if salt.utils.path.which('id'):
- root_group = self.run_function('user.primary_group', ['root'])
- self.assertEqual(grp.getgrgid(twostats.st_gid).gr_name, root_group)
- @skip_if_not_root
- @skipIf(not HAS_PWD, "pwd not available. Skipping test")
- @skipIf(not HAS_GRP, "grp not available. Skipping test")
- @with_system_user_and_group('user12209', 'group12209',
- on_existing='delete', delete=True)
- @with_tempdir()
- def test_issue_12209_no_follow_symlinks(self, tempdir, user, group):
- '''
- Ensure that symlinks are properly chowned when recursing (not following
- symlinks)
- '''
- # Make the directories for this test
- onedir = os.path.join(tempdir, 'one')
- twodir = os.path.join(tempdir, 'two')
- os.mkdir(onedir)
- os.symlink(onedir, twodir)
- # Run the state
- ret = self.run_state(
- 'file.directory', name=tempdir, follow_symlinks=False,
- user=user, group=group, recurse=['user', 'group']
- )
- self.assertSaltTrueReturn(ret)
- # Double-check, in case state mis-reported a True result. Since we
- # are not following symlinks, we expect twodir to now be owned by
- # the 'issue12209' user, just link onedir.
- onestats = os.stat(onedir)
- twostats = os.lstat(twodir)
- self.assertEqual(pwd.getpwuid(onestats.st_uid).pw_name, user)
- self.assertEqual(pwd.getpwuid(twostats.st_uid).pw_name, user)
- self.assertEqual(grp.getgrgid(onestats.st_gid).gr_name, group)
- self.assertEqual(grp.getgrgid(twostats.st_gid).gr_name, group)
- @with_tempfile(create=False)
- @with_tempfile()
- def test_template_local_file(self, source, dest):
- '''
- Test a file.managed state with a local file as the source. Test both
- with the file:// protocol designation prepended, and without it.
- '''
- with salt.utils.files.fopen(source, 'w') as fp_:
- fp_.write('{{ foo }}\n')
- for prefix in ('file://', ''):
- ret = self.run_state(
- 'file.managed',
- name=dest,
- source=prefix + source,
- template='jinja',
- context={'foo': 'Hello world!'}
- )
- self.assertSaltTrueReturn(ret)
- @with_tempfile()
- def test_template_local_file_noclobber(self, source):
- '''
- Test the case where a source file is in the minion's local filesystem,
- and the source path is the same as the destination path.
- '''
- with salt.utils.files.fopen(source, 'w') as fp_:
- fp_.write('{{ foo }}\n')
- ret = self.run_state(
- 'file.managed',
- name=source,
- source=source,
- template='jinja',
- context={'foo': 'Hello world!'}
- )
- self.assertSaltFalseReturn(ret)
- self.assertIn(
- ('Source file cannot be the same as destination'),
- ret[next(iter(ret))]['comment'],
- )
- @with_tempfile(create=False)
- @with_tempfile(create=False)
- def test_issue_25250_force_copy_deletes(self, source, dest):
- '''
- ensure force option in copy state does not delete target file
- '''
- shutil.copyfile(os.path.join(FILES, 'hosts'), source)
- shutil.copyfile(os.path.join(FILES, 'file/base/cheese'), dest)
- self.run_state('file.copy', name=dest, source=source, force=True)
- self.assertTrue(os.path.exists(dest))
- self.assertTrue(filecmp.cmp(source, dest))
- os.remove(source)
- os.remove(dest)
- @destructiveTest
- @with_tempfile()
- def test_file_copy_make_dirs(self, source):
- '''
- ensure make_dirs creates correct user perms
- '''
- shutil.copyfile(os.path.join(FILES, 'hosts'), source)
- dest = os.path.join(TMP, 'dir1', 'dir2', 'copied_file.txt')
- user = 'salt'
- mode = '0644'
- self.run_function('user.add', [user])
- ret = self.run_state('file.copy', name=dest, source=source, user=user,
- makedirs=True, mode=mode)
- file_checks = [dest, os.path.join(TMP, 'dir1'), os.path.join(TMP, 'dir1', 'dir2')]
- for check in file_checks:
- user_check = self.run_function('file.get_user', [check])
- mode_check = self.run_function('file.get_mode', [check])
- assert user_check == user
- assert salt.utils.files.normalize_mode(mode_check) == mode
- def test_contents_pillar_with_pillar_list(self):
- '''
- This tests for any regressions for this issue:
- https://github.com/saltstack/salt/issues/30934
- '''
- state_file = 'file_contents_pillar'
- ret = self.run_function('state.sls', mods=state_file)
- self.assertSaltTrueReturn(ret)
- @skip_if_not_root
- @skipIf(not HAS_PWD, "pwd not available. Skipping test")
- @skipIf(not HAS_GRP, "grp not available. Skipping test")
- @with_system_user_and_group('test_setuid_user', 'test_setuid_group',
- on_existing='delete', delete=True)
- def test_owner_after_setuid(self, user, group):
- '''
- Test to check file user/group after setting setuid or setgid.
- Because Python os.chown() does reset the setuid/setgid to 0.
- https://github.com/saltstack/salt/pull/45257
- '''
- # Desired configuration.
- desired = {
- 'file': os.path.join(TMP, 'file_with_setuid'),
- 'user': user,
- 'group': group,
- 'mode': '4750'
- }
- # Run the state.
- ret = self.run_state(
- 'file.managed', name=desired['file'],
- user=desired['user'], group=desired['group'], mode=desired['mode']
- )
- # Check result.
- file_stat = os.stat(desired['file'])
- result = {
- 'user': pwd.getpwuid(file_stat.st_uid).pw_name,
- 'group': grp.getgrgid(file_stat.st_gid).gr_name,
- 'mode': oct(stat.S_IMODE(file_stat.st_mode))
- }
- self.assertSaltTrueReturn(ret)
- self.assertEqual(desired['user'], result['user'])
- self.assertEqual(desired['group'], result['group'])
- self.assertEqual(desired['mode'], result['mode'].lstrip('0Oo'))
- def test_binary_contents(self):
- '''
- This tests to ensure that binary contents do not cause a traceback.
- '''
- name = os.path.join(TMP, '1px.gif')
- try:
- ret = self.run_state(
- 'file.managed',
- name=name,
- contents=BINARY_FILE)
- self.assertSaltTrueReturn(ret)
- finally:
- try:
- os.remove(name)
- except OSError:
- pass
- @skip_if_not_root
- @skipIf(not HAS_PWD, "pwd not available. Skipping test")
- @skipIf(not HAS_GRP, "grp not available. Skipping test")
- @with_system_user_and_group('user12209', 'group12209',
- on_existing='delete', delete=True)
- @with_tempdir()
- def test_issue_48336_file_managed_mode_setuid(self, tempdir, user, group):
- '''
- Ensure that mode is correct with changing of ownership and group
- symlinks)
- '''
- tempfile = os.path.join(tempdir, 'temp_file_issue_48336')
- # Run the state
- ret = self.run_state(
- 'file.managed', name=tempfile,
- user=user, group=group, mode='4750',
- )
- self.assertSaltTrueReturn(ret)
- # Check that the owner and group are correct, and
- # the mode is what we expect
- temp_file_stats = os.stat(tempfile)
- # Normalize the mode
- temp_file_mode = six.text_type(oct(stat.S_IMODE(temp_file_stats.st_mode)))
- temp_file_mode = salt.utils.files.normalize_mode(temp_file_mode)
- self.assertEqual(temp_file_mode, '4750')
- self.assertEqual(pwd.getpwuid(temp_file_stats.st_uid).pw_name, user)
- self.assertEqual(grp.getgrgid(temp_file_stats.st_gid).gr_name, group)
- @with_tempdir()
- def test_issue_48557(self, tempdir):
- tempfile = os.path.join(tempdir, 'temp_file_issue_48557')
- with salt.utils.files.fopen(tempfile, 'wb') as fp:
- fp.write(os.linesep.join([
- 'test1',
- 'test2',
- 'test3',
- '',
- ]).encode('utf-8'))
- ret = self.run_state('file.line',
- name=tempfile,
- after='test2',
- mode='insert',
- content='test4')
- self.assertSaltTrueReturn(ret)
- with salt.utils.files.fopen(tempfile, 'rb') as fp:
- content = fp.read()
- self.assertEqual(content, os.linesep.join([
- 'test1',
- 'test2',
- 'test4',
- 'test3',
- '',
- ]).encode('utf-8'))
- @with_tempfile()
- def test_issue_50221(self, name):
- expected = 'abc{0}{0}{0}'.format(os.linesep)
- ret = self.run_function(
- 'pillar.get',
- ['issue-50221']
- )
- assert ret == expected
- ret = self.run_function(
- 'state.apply',
- ['issue-50221'],
- pillar={
- 'name': name
- },
- )
- self.assertSaltTrueReturn(ret)
- with salt.utils.files.fopen(name, 'r') as fp:
- contents = fp.read()
- assert contents == expected
- def test_managed_file_issue_51208(self):
- '''
- Test to ensure we can handle a file with escaped double-quotes
- '''
- name = os.path.join(TMP, 'issue_51208.txt')
- ret = self.run_state(
- 'file.managed', name=name, source='salt://issue-51208/vimrc.stub'
- )
- src = os.path.join(BASE_FILES, 'issue-51208', 'vimrc.stub')
- with salt.utils.files.fopen(src, 'r') as fp_:
- master_data = fp_.read()
- with salt.utils.files.fopen(name, 'r') as fp_:
- minion_data = fp_.read()
- self.assertEqual(master_data, minion_data)
- self.assertSaltTrueReturn(ret)
- class BlockreplaceTest(ModuleCase, SaltReturnAssertsMixin):
- marker_start = '# start'
- marker_end = '# end'
- content = dedent(six.text_type('''\
- Line 1 of block
- Line 2 of block
- '''))
- without_block = dedent(six.text_type('''\
- Hello world!
- # comment here
- '''))
- with_non_matching_block = dedent(six.text_type('''\
- Hello world!
- # start
- No match here
- # end
- # comment here
- '''))
- with_non_matching_block_and_marker_end_not_after_newline = dedent(six.text_type('''\
- Hello world!
- # start
- No match here# end
- # comment here
- '''))
- with_matching_block = dedent(six.text_type('''\
- Hello world!
- # start
- Line 1 of block
- Line 2 of block
- # end
- # comment here
- '''))
- with_matching_block_and_extra_newline = dedent(six.text_type('''\
- Hello world!
- # start
- Line 1 of block
- Line 2 of block
- # end
- # comment here
- '''))
- with_matching_block_and_marker_end_not_after_newline = dedent(six.text_type('''\
- Hello world!
- # start
- Line 1 of block
- Line 2 of block# end
- # comment here
- '''))
- content_explicit_posix_newlines = ('Line 1 of block\n'
- 'Line 2 of block\n')
- content_explicit_windows_newlines = ('Line 1 of block\r\n'
- 'Line 2 of block\r\n')
- without_block_explicit_posix_newlines = ('Hello world!\n\n'
- '# comment here\n')
- without_block_explicit_windows_newlines = ('Hello world!\r\n\r\n'
- '# comment here\r\n')
- with_block_prepended_explicit_posix_newlines = ('# start\n'
- 'Line 1 of block\n'
- 'Line 2 of block\n'
- '# end\n'
- 'Hello world!\n\n'
- '# comment here\n')
- with_block_prepended_explicit_windows_newlines = ('# start\r\n'
- 'Line 1 of block\r\n'
- 'Line 2 of block\r\n'
- '# end\r\n'
- 'Hello world!\r\n\r\n'
- '# comment here\r\n')
- with_block_appended_explicit_posix_newlines = ('Hello world!\n\n'
- '# comment here\n'
- '# start\n'
- 'Line 1 of block\n'
- 'Line 2 of block\n'
- '# end\n')
- with_block_appended_explicit_windows_newlines = ('Hello world!\r\n\r\n'
- '# comment here\r\n'
- '# start\r\n'
- 'Line 1 of block\r\n'
- 'Line 2 of block\r\n'
- '# end\r\n')
- @staticmethod
- def _write(dest, content):
- with salt.utils.files.fopen(dest, 'wb') as fp_:
- fp_.write(salt.utils.stringutils.to_bytes(content))
- @staticmethod
- def _read(src):
- with salt.utils.files.fopen(src, 'rb') as fp_:
- return salt.utils.stringutils.to_unicode(fp_.read())
- @with_tempfile()
- def test_prepend(self, name):
- '''
- Test blockreplace when prepend_if_not_found=True and block doesn't
- exist in file.
- '''
- expected = self.marker_start + os.linesep + self.content + \
- self.marker_end + os.linesep + self.without_block
- # Pass 1: content ends in newline
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_prepend_append_newline(self, name):
- '''
- Test blockreplace when prepend_if_not_found=True and block doesn't
- exist in file. Test with append_newline explicitly set to True.
- '''
- # Pass 1: content ends in newline
- expected = self.marker_start + os.linesep + self.content + \
- os.linesep + self.marker_end + os.linesep + self.without_block
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- expected = self.marker_start + os.linesep + self.content + \
- self.marker_end + os.linesep + self.without_block
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_prepend_no_append_newline(self, name):
- '''
- Test blockreplace when prepend_if_not_found=True and block doesn't
- exist in file. Test with append_newline explicitly set to False.
- '''
- # Pass 1: content ends in newline
- expected = self.marker_start + os.linesep + self.content + \
- self.marker_end + os.linesep + self.without_block
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- expected = self.marker_start + os.linesep + \
- self.content.rstrip('\r\n') + self.marker_end + os.linesep + \
- self.without_block
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_append(self, name):
- '''
- Test blockreplace when append_if_not_found=True and block doesn't
- exist in file.
- '''
- expected = self.without_block + self.marker_start + os.linesep + \
- self.content + self.marker_end + os.linesep
- # Pass 1: content ends in newline
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_append_append_newline(self, name):
- '''
- Test blockreplace when append_if_not_found=True and block doesn't
- exist in file. Test with append_newline explicitly set to True.
- '''
- # Pass 1: content ends in newline
- expected = self.without_block + self.marker_start + os.linesep + \
- self.content + os.linesep + self.marker_end + os.linesep
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- expected = self.without_block + self.marker_start + os.linesep + \
- self.content + self.marker_end + os.linesep
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_append_no_append_newline(self, name):
- '''
- Test blockreplace when append_if_not_found=True and block doesn't
- exist in file. Test with append_newline explicitly set to False.
- '''
- # Pass 1: content ends in newline
- expected = self.without_block + self.marker_start + os.linesep + \
- self.content + self.marker_end + os.linesep
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- expected = self.without_block + self.marker_start + os.linesep + \
- self.content.rstrip('\r\n') + self.marker_end + os.linesep
- self._write(name, self.without_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_prepend_auto_line_separator(self, name):
- '''
- This tests the line separator auto-detection when prepending the block
- '''
- # POSIX newlines to Windows newlines
- self._write(name, self.without_block_explicit_windows_newlines)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content_explicit_posix_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_block_prepended_explicit_windows_newlines)
- # Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content_explicit_posix_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_block_prepended_explicit_windows_newlines)
- # Windows newlines to POSIX newlines
- self._write(name, self.without_block_explicit_posix_newlines)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content_explicit_windows_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_block_prepended_explicit_posix_newlines)
- # Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content_explicit_windows_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_block_prepended_explicit_posix_newlines)
- @with_tempfile()
- def test_append_auto_line_separator(self, name):
- '''
- This tests the line separator auto-detection when appending the block
- '''
- # POSIX newlines to Windows newlines
- self._write(name, self.without_block_explicit_windows_newlines)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content_explicit_posix_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_block_appended_explicit_windows_newlines)
- # Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content_explicit_posix_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_block_appended_explicit_windows_newlines)
- # Windows newlines to POSIX newlines
- self._write(name, self.without_block_explicit_posix_newlines)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content_explicit_windows_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_block_appended_explicit_posix_newlines)
- # Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content_explicit_windows_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_block_appended_explicit_posix_newlines)
- @with_tempfile()
- def test_non_matching_block(self, name):
- '''
- Test blockreplace when block exists but its contents are not a
- match.
- '''
- # Pass 1: content ends in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_non_matching_block_append_newline(self, name):
- '''
- Test blockreplace when block exists but its contents are not a
- match. Test with append_newline explicitly set to True.
- '''
- # Pass 1: content ends in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_extra_newline)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_extra_newline)
- # Pass 2: content does not end in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_non_matching_block_no_append_newline(self, name):
- '''
- Test blockreplace when block exists but its contents are not a
- match. Test with append_newline explicitly set to False.
- '''
- # Pass 1: content ends in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_marker_end_not_after_newline)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_marker_end_not_after_newline)
- @with_tempfile()
- def test_non_matching_block_and_marker_not_after_newline(self, name):
- '''
- Test blockreplace when block exists but its contents are not a
- match, and the marker_end is not directly preceded by a newline.
- '''
- # Pass 1: content ends in newline
- self._write(
- name,
- self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(
- name,
- self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_non_matching_block_and_marker_not_after_newline_append_newline(self, name):
- '''
- Test blockreplace when block exists but its contents are not a match,
- and the marker_end is not directly preceded by a newline. Test with
- append_newline explicitly set to True.
- '''
- # Pass 1: content ends in newline
- self._write(
- name,
- self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_extra_newline)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_extra_newline)
- # Pass 2: content does not end in newline
- self._write(
- name,
- self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_non_matching_block_and_marker_not_after_newline_no_append_newline(self, name):
- '''
- Test blockreplace when block exists but its contents are not a match,
- and the marker_end is not directly preceded by a newline. Test with
- append_newline explicitly set to False.
- '''
- # Pass 1: content ends in newline
- self._write(
- name,
- self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(
- name,
- self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_marker_end_not_after_newline)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_marker_end_not_after_newline)
- @with_tempfile()
- def test_matching_block(self, name):
- '''
- Test blockreplace when block exists and its contents are a match. No
- changes should be made.
- '''
- # Pass 1: content ends in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_matching_block_append_newline(self, name):
- '''
- Test blockreplace when block exists and its contents are a match. Test
- with append_newline explicitly set to True. This will result in an
- extra newline when the content ends in a newline, and will not when the
- content does not end in a newline.
- '''
- # Pass 1: content ends in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_extra_newline)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_extra_newline)
- # Pass 2: content does not end in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_matching_block_no_append_newline(self, name):
- '''
- Test blockreplace when block exists and its contents are a match. Test
- with append_newline explicitly set to False. This will result in the
- marker_end not being directly preceded by a newline when the content
- does not end in a newline.
- '''
- # Pass 1: content ends in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_marker_end_not_after_newline)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_marker_end_not_after_newline)
- @with_tempfile()
- def test_matching_block_and_marker_not_after_newline(self, name):
- '''
- Test blockreplace when block exists and its contents are a match, but
- the marker_end is not directly preceded by a newline.
- '''
- # Pass 1: content ends in newline
- self._write(
- name,
- self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(
- name,
- self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_matching_block_and_marker_not_after_newline_append_newline(self, name):
- '''
- Test blockreplace when block exists and its contents are a match, but
- the marker_end is not directly preceded by a newline. Test with
- append_newline explicitly set to True. This will result in an extra
- newline when the content ends in a newline, and will not when the
- content does not end in a newline.
- '''
- # Pass 1: content ends in newline
- self._write(
- name,
- self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_extra_newline)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_extra_newline)
- # Pass 2: content does not end in newline
- self._write(
- name,
- self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_matching_block_and_marker_not_after_newline_no_append_newline(self, name):
- '''
- Test blockreplace when block exists and its contents are a match, but
- the marker_end is not directly preceded by a newline. Test with
- append_newline explicitly set to False.
- '''
- # Pass 1: content ends in newline
- self._write(
- name,
- self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(
- name,
- self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_marker_end_not_after_newline)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state('file.blockreplace',
- name=name,
- content=self.content.rstrip('\r\n'),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]['changes'])
- self.assertEqual(
- self._read(name),
- self.with_matching_block_and_marker_end_not_after_newline)
- @with_tempfile()
- def test_issue_49043(self, name):
- ret = self.run_function(
- 'state.sls',
- mods='issue-49043',
- pillar={'name': name},
- )
- log.error("ret = %s", repr(ret))
- diff = '--- \n+++ \n@@ -0,0 +1,3 @@\n'
- diff += dedent('''\
- +#-- start managed zone --
- +äöü
- +#-- end managed zone --
- ''')
- job = 'file_|-somefile-blockreplace_|-{}_|-blockreplace'.format(name)
- self.assertEqual(
- ret[job]['changes']['diff'],
- diff)
- class RemoteFileTest(ModuleCase, SaltReturnAssertsMixin):
- '''
- Uses a local tornado webserver to test http(s) file.managed states with and
- without skip_verify
- '''
- @classmethod
- def setUpClass(cls):
- cls.webserver = Webserver()
- cls.webserver.start()
- cls.source = cls.webserver.url('grail/scene33')
- if IS_WINDOWS:
- # CRLF vs LF causes a different hash on windows
- cls.source_hash = '21438b3d5fd2c0028bcab92f7824dc69'
- else:
- cls.source_hash = 'd2feb3beb323c79fc7a0f44f1408b4a3'
- @classmethod
- def tearDownClass(cls):
- cls.webserver.stop()
- @with_tempfile(create=False)
- def setUp(self, name): # pylint: disable=arguments-differ
- self.name = name
- def tearDown(self):
- try:
- os.remove(self.name)
- except OSError as exc:
- if exc.errno != errno.ENOENT:
- raise exc
- def run_state(self, *args, **kwargs):
- ret = super(RemoteFileTest, self).run_state(*args, **kwargs)
- log.debug('ret = %s', ret)
- return ret
- def test_file_managed_http_source_no_hash(self):
- '''
- Test a remote file with no hash
- '''
- ret = self.run_state('file.managed',
- name=self.name,
- source=self.source,
- skip_verify=False)
- # This should fail because no hash was provided
- self.assertSaltFalseReturn(ret)
- def test_file_managed_http_source(self):
- '''
- Test a remote file with no hash
- '''
- ret = self.run_state('file.managed',
- name=self.name,
- source=self.source,
- source_hash=self.source_hash,
- skip_verify=False)
- self.assertSaltTrueReturn(ret)
- def test_file_managed_http_source_skip_verify(self):
- '''
- Test a remote file using skip_verify
- '''
- ret = self.run_state('file.managed',
- name=self.name,
- source=self.source,
- skip_verify=True)
- self.assertSaltTrueReturn(ret)
- def test_file_managed_keep_source_false_http(self):
- '''
- This test ensures that we properly clean the cached file if keep_source
- is set to False, for source files using an http:// URL
- '''
- # Run the state
- ret = self.run_state('file.managed',
- name=self.name,
- source=self.source,
- source_hash=self.source_hash,
- keep_source=False)
- ret = ret[next(iter(ret))]
- assert ret['result'] is True
- # Now make sure that the file is not cached
- result = self.run_function('cp.is_cached', [self.source])
- assert result == '', 'File is still cached at {0}'.format(result)
- @skipIf(not salt.utils.path.which('patch'), 'patch is not installed')
- class PatchTest(ModuleCase, SaltReturnAssertsMixin):
- def _check_patch_version(self, min_version):
- '''
- patch version check
- '''
- version = self.run_function('cmd.run', ['patch --version']).splitlines()[0]
- version = version.split()[1]
- if _LooseVersion(version) < _LooseVersion(min_version):
- self.skipTest('Minimum patch version required: {0}. '
- 'Patch version installed: {1}'.format(min_version, version))
- @classmethod
- def setUpClass(cls):
- cls.webserver = Webserver()
- cls.webserver.start()
- cls.numbers_patch_name = 'numbers.patch'
- cls.math_patch_name = 'math.patch'
- cls.all_patch_name = 'all.patch'
- cls.numbers_patch_template_name = cls.numbers_patch_name + '.jinja'
- cls.math_patch_template_name = cls.math_patch_name + '.jinja'
- cls.all_patch_template_name = cls.all_patch_name + '.jinja'
- cls.numbers_patch_path = 'patches/' + cls.numbers_patch_name
- cls.math_patch_path = 'patches/' + cls.math_patch_name
- cls.all_patch_path = 'patches/' + cls.all_patch_name
- cls.numbers_patch_template_path = \
- 'patches/' + cls.numbers_patch_template_name
- cls.math_patch_template_path = \
- 'patches/' + cls.math_patch_template_name
- cls.all_patch_template_path = \
- 'patches/' + cls.all_patch_template_name
- cls.numbers_patch = 'salt://' + cls.numbers_patch_path
- cls.math_patch = 'salt://' + cls.math_patch_path
- cls.all_patch = 'salt://' + cls.all_patch_path
- cls.numbers_patch_template = 'salt://' + cls.numbers_patch_template_path
- cls.math_patch_template = 'salt://' + cls.math_patch_template_path
- cls.all_patch_template = 'salt://' + cls.all_patch_template_path
- cls.numbers_patch_http = cls.webserver.url(cls.numbers_patch_path)
- cls.math_patch_http = cls.webserver.url(cls.math_patch_path)
- cls.all_patch_http = cls.webserver.url(cls.all_patch_path)
- cls.numbers_patch_template_http = \
- cls.webserver.url(cls.numbers_patch_template_path)
- cls.math_patch_template_http = \
- cls.webserver.url(cls.math_patch_template_path)
- cls.all_patch_template_http = \
- cls.webserver.url(cls.all_patch_template_path)
- patches_dir = os.path.join(FILES, 'file', 'base', 'patches')
- cls.numbers_patch_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.numbers_patch_name)
- )
- cls.math_patch_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.math_patch_name)
- )
- cls.all_patch_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.all_patch_name)
- )
- cls.numbers_patch_template_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.numbers_patch_template_name)
- )
- cls.math_patch_template_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.math_patch_template_name)
- )
- cls.all_patch_template_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.all_patch_template_name)
- )
- cls.context = {'two': 'two', 'ten': 10}
- @classmethod
- def tearDownClass(cls):
- cls.webserver.stop()
- def setUp(self):
- '''
- Create a new unpatched set of files
- '''
- self.base_dir = tempfile.mkdtemp(dir=TMP)
- os.makedirs(os.path.join(self.base_dir, 'foo', 'bar'))
- self.numbers_file = os.path.join(self.base_dir, 'foo', 'numbers.txt')
- self.math_file = os.path.join(self.base_dir, 'foo', 'bar', 'math.txt')
- with salt.utils.files.fopen(self.numbers_file, 'w') as fp_:
- fp_.write(textwrap.dedent('''\
- one
- two
- three
- 1
- 2
- 3
- '''))
- with salt.utils.files.fopen(self.math_file, 'w') as fp_:
- fp_.write(textwrap.dedent('''\
- Five plus five is ten
- Four squared is sixteen
- '''))
- self.addCleanup(shutil.rmtree, self.base_dir, ignore_errors=True)
- def test_patch_single_file(self):
- '''
- Test file.patch using a patch applied to a single file
- '''
- ret = self.run_state(
- 'file.patch',
- name=self.numbers_file,
- source=self.numbers_patch,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch successfully applied')
- # Re-run the state, should succeed and there should be a message about
- # a partially-applied hunk.
- ret = self.run_state(
- 'file.patch',
- name=self.numbers_file,
- source=self.numbers_patch,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch was already applied')
- self.assertEqual(ret['changes'], {})
- def test_patch_directory(self):
- '''
- Test file.patch using a patch applied to a directory, with changes
- spanning multiple files.
- '''
- self._check_patch_version('2.6')
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch,
- strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch successfully applied')
- # Re-run the state, should succeed and there should be a message about
- # a partially-applied hunk.
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch,
- strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch was already applied')
- self.assertEqual(ret['changes'], {})
- def test_patch_strip_parsing(self):
- '''
- Test that we successfuly parse -p/--strip when included in the options
- '''
- self._check_patch_version('2.6')
- # Run the state using -p1
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch,
- options='-p1',
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch successfully applied')
- # Re-run the state using --strip=1
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch,
- options='--strip=1',
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch was already applied')
- self.assertEqual(ret['changes'], {})
- # Re-run the state using --strip 1
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch,
- options='--strip 1',
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch was already applied')
- self.assertEqual(ret['changes'], {})
- def test_patch_saltenv(self):
- '''
- Test that we attempt to download the patch from a non-base saltenv
- '''
- # This state will fail because we don't have a patch file in that
- # environment, but that is OK, we just want to test that we're looking
- # in an environment other than base.
- ret = self.run_state(
- 'file.patch',
- name=self.math_file,
- source=self.math_patch,
- saltenv='prod',
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(
- ret['comment'],
- "Source file {0} not found in saltenv 'prod'".format(self.math_patch)
- )
- def test_patch_single_file_failure(self):
- '''
- Test file.patch using a patch applied to a single file. This tests a
- failed patch.
- '''
- # Empty the file to ensure that the patch doesn't apply cleanly
- with salt.utils.files.fopen(self.numbers_file, 'w'):
- pass
- ret = self.run_state(
- 'file.patch',
- name=self.numbers_file,
- source=self.numbers_patch,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn('Patch would not apply cleanly', ret['comment'])
- # Test the reject_file option and ensure that the rejects are written
- # to the path specified.
- reject_file = salt.utils.files.mkstemp()
- ret = self.run_state(
- 'file.patch',
- name=self.numbers_file,
- source=self.numbers_patch,
- reject_file=reject_file,
- strip=1,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn('Patch would not apply cleanly', ret['comment'])
- self.assertIn(
- 'saving rejects to file {0}'.format(reject_file),
- ret['comment']
- )
- def test_patch_directory_failure(self):
- '''
- Test file.patch using a patch applied to a directory, with changes
- spanning multiple files.
- '''
- # Empty the file to ensure that the patch doesn't apply
- with salt.utils.files.fopen(self.math_file, 'w'):
- pass
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch,
- strip=1,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn('Patch would not apply cleanly', ret['comment'])
- # Test the reject_file option and ensure that the rejects are written
- # to the path specified.
- reject_file = salt.utils.files.mkstemp()
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch,
- reject_file=reject_file,
- strip=1,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn('Patch would not apply cleanly', ret['comment'])
- self.assertIn(
- 'saving rejects to file {0}'.format(reject_file),
- ret['comment']
- )
- def test_patch_single_file_remote_source(self):
- '''
- Test file.patch using a patch applied to a single file, with the patch
- coming from a remote source.
- '''
- # Try without a source_hash and without skip_verify=True, this should
- # fail with a message about the source_hash
- ret = self.run_state(
- 'file.patch',
- name=self.math_file,
- source=self.math_patch_http,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn('Unable to verify upstream hash', ret['comment'])
- # Re-run the state with a source hash, it should now succeed
- ret = self.run_state(
- 'file.patch',
- name=self.math_file,
- source=self.math_patch_http,
- source_hash=self.math_patch_hash,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch successfully applied')
- # Re-run again, this time with no hash and skip_verify=True to test
- # skipping hash verification
- ret = self.run_state(
- 'file.patch',
- name=self.math_file,
- source=self.math_patch_http,
- skip_verify=True,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch was already applied')
- self.assertEqual(ret['changes'], {})
- def test_patch_directory_remote_source(self):
- '''
- Test file.patch using a patch applied to a directory, with changes
- spanning multiple files, and the patch file coming from a remote
- source.
- '''
- self._check_patch_version('2.6')
- # Try without a source_hash and without skip_verify=True, this should
- # fail with a message about the source_hash
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch_http,
- strip=1,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn('Unable to verify upstream hash', ret['comment'])
- # Re-run the state with a source hash, it should now succeed
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch_http,
- source_hash=self.all_patch_hash,
- strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch successfully applied')
- # Re-run again, this time with no hash and skip_verify=True to test
- # skipping hash verification
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch_http,
- strip=1,
- skip_verify=True,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch was already applied')
- self.assertEqual(ret['changes'], {})
- def test_patch_single_file_template(self):
- '''
- Test file.patch using a patch applied to a single file, with jinja
- templating applied to the patch file.
- '''
- ret = self.run_state(
- 'file.patch',
- name=self.numbers_file,
- source=self.numbers_patch_template,
- template='jinja',
- context=self.context,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch successfully applied')
- # Re-run the state, should succeed and there should be a message about
- # a partially-applied hunk.
- ret = self.run_state(
- 'file.patch',
- name=self.numbers_file,
- source=self.numbers_patch_template,
- template='jinja',
- context=self.context,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch was already applied')
- self.assertEqual(ret['changes'], {})
- def test_patch_directory_template(self):
- '''
- Test file.patch using a patch applied to a directory, with changes
- spanning multiple files, and with jinja templating applied to the patch
- file.
- '''
- self._check_patch_version('2.6')
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch_template,
- template='jinja',
- context=self.context,
- strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch successfully applied')
- # Re-run the state, should succeed and there should be a message about
- # a partially-applied hunk.
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch_template,
- template='jinja',
- context=self.context,
- strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch was already applied')
- self.assertEqual(ret['changes'], {})
- def test_patch_single_file_remote_source_template(self):
- '''
- Test file.patch using a patch applied to a single file, with the patch
- coming from a remote source.
- '''
- # Try without a source_hash and without skip_verify=True, this should
- # fail with a message about the source_hash
- ret = self.run_state(
- 'file.patch',
- name=self.math_file,
- source=self.math_patch_template_http,
- template='jinja',
- context=self.context,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn('Unable to verify upstream hash', ret['comment'])
- # Re-run the state with a source hash, it should now succeed
- ret = self.run_state(
- 'file.patch',
- name=self.math_file,
- source=self.math_patch_template_http,
- source_hash=self.math_patch_template_hash,
- template='jinja',
- context=self.context,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch successfully applied')
- # Re-run again, this time with no hash and skip_verify=True to test
- # skipping hash verification
- ret = self.run_state(
- 'file.patch',
- name=self.math_file,
- source=self.math_patch_template_http,
- template='jinja',
- context=self.context,
- skip_verify=True,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch was already applied')
- self.assertEqual(ret['changes'], {})
- def test_patch_directory_remote_source_template(self):
- '''
- Test file.patch using a patch applied to a directory, with changes
- spanning multiple files, and the patch file coming from a remote
- source.
- '''
- self._check_patch_version('2.6')
- # Try without a source_hash and without skip_verify=True, this should
- # fail with a message about the source_hash
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch_template_http,
- template='jinja',
- context=self.context,
- strip=1,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn('Unable to verify upstream hash', ret['comment'])
- # Re-run the state with a source hash, it should now succeed
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch_template_http,
- source_hash=self.all_patch_template_hash,
- template='jinja',
- context=self.context,
- strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch successfully applied')
- # Re-run again, this time with no hash and skip_verify=True to test
- # skipping hash verification
- ret = self.run_state(
- 'file.patch',
- name=self.base_dir,
- source=self.all_patch_template_http,
- template='jinja',
- context=self.context,
- strip=1,
- skip_verify=True,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch was already applied')
- self.assertEqual(ret['changes'], {})
- def test_patch_test_mode(self):
- '''
- Test file.patch using test=True
- '''
- # Try without a source_hash and without skip_verify=True, this should
- # fail with a message about the source_hash
- ret = self.run_state(
- 'file.patch',
- name=self.numbers_file,
- source=self.numbers_patch,
- test=True,
- )
- self.assertSaltNoneReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'The patch would be applied')
- self.assertTrue(ret['changes'])
- # Apply the patch for real. We'll then be able to test below that we
- # exit with a True rather than a None result if test=True is used on an
- # already-applied patch.
- ret = self.run_state(
- 'file.patch',
- name=self.numbers_file,
- source=self.numbers_patch,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch successfully applied')
- self.assertTrue(ret['changes'])
- # Run again with test=True. Since the pre-check happens before we do
- # the __opts__['test'] check, we should exit with a True result just
- # the same as if we try to run this state on an already-patched file
- # *without* test=True.
- ret = self.run_state(
- 'file.patch',
- name=self.numbers_file,
- source=self.numbers_patch,
- test=True,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret['comment'], 'Patch was already applied')
- self.assertEqual(ret['changes'], {})
- # Empty the file to ensure that the patch doesn't apply cleanly
- with salt.utils.files.fopen(self.numbers_file, 'w'):
- pass
- # Run again with test=True. Similar to the above run, we are testing
- # that we return before we reach the __opts__['test'] check. In this
- # case we should return a False result because we should already know
- # by this point that the patch will not apply cleanly.
- ret = self.run_state(
- 'file.patch',
- name=self.numbers_file,
- source=self.numbers_patch,
- test=True,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn('Patch would not apply cleanly', ret['comment'])
- self.assertEqual(ret['changes'], {})
- WIN_TEST_FILE = 'c:/testfile'
- @destructiveTest
- @skipIf(not IS_WINDOWS, 'windows test only')
- class WinFileTest(ModuleCase):
- '''
- Test for the file state on Windows
- '''
- def setUp(self):
- self.run_state('file.managed', name=WIN_TEST_FILE, makedirs=True, contents='Only a test')
- def tearDown(self):
- self.run_state('file.absent', name=WIN_TEST_FILE)
- def test_file_managed(self):
- '''
- Test file.managed on Windows
- '''
- self.assertTrue(self.run_state('file.exists', name=WIN_TEST_FILE))
- def test_file_copy(self):
- '''
- Test file.copy on Windows
- '''
- ret = self.run_state('file.copy', name='c:/testfile_copy', makedirs=True, source=WIN_TEST_FILE)
- self.assertTrue(ret)
- def test_file_comment(self):
- '''
- Test file.comment on Windows
- '''
- self.run_state('file.comment', name=WIN_TEST_FILE, regex='^Only')
- with salt.utils.files.fopen(WIN_TEST_FILE, 'r') as fp_:
- self.assertTrue(fp_.read().startswith('#Only'))
- def test_file_replace(self):
- '''
- Test file.replace on Windows
- '''
- self.run_state('file.replace', name=WIN_TEST_FILE, pattern='test', repl='testing')
- with salt.utils.files.fopen(WIN_TEST_FILE, 'r') as fp_:
- self.assertIn('testing', fp_.read())
- def test_file_absent(self):
- '''
- Test file.absent on Windows
- '''
- ret = self.run_state('file.absent', name=WIN_TEST_FILE)
- self.assertTrue(ret)
|